Python multiprocessing fork with improvements and bugfixes for distributed task processing
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Synchronization primitives including locks, semaphores, events, conditions, and barriers for coordinating processes and ensuring thread-safe access to shared resources.
Mutual exclusion primitives for protecting critical sections and shared resources.
class Lock:
"""
A non-recursive lock (mutual exclusion lock).
"""
def acquire(self, block=True, timeout=None) -> bool:
"""
Acquire the lock.
Parameters:
- block: whether to block if lock is unavailable
- timeout: timeout in seconds (None for no timeout)
Returns:
True if lock acquired, False otherwise
"""
def release(self):
"""
Release the lock.
Raises:
- ValueError: if lock is not currently held
"""
def __enter__(self):
"""Context manager entry."""
return self.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.release()
class RLock:
"""
A reentrant lock (recursive lock).
Can be acquired multiple times by the same process.
"""
def acquire(self, block=True, timeout=None) -> bool:
"""
Acquire the lock, incrementing recursion level.
Parameters:
- block: whether to block if lock is unavailable
- timeout: timeout in seconds (None for no timeout)
Returns:
True if lock acquired, False otherwise
"""
def release(self):
"""
Release the lock, decrementing recursion level.
Raises:
- ValueError: if lock is not currently held by calling process
"""
def __enter__(self):
"""Context manager entry."""
return self.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.release()Usage example:
from billiard import Process, Lock, RLock
import time
# Shared resource counter
counter = 0
lock = Lock()
def worker_with_lock(name, iterations, shared_lock):
"""Worker that safely increments counter"""
global counter
for i in range(iterations):
# Critical section
with shared_lock:
old_value = counter
time.sleep(0.001) # Simulate some work
counter = old_value + 1
print(f"{name}: incremented counter to {counter}")
def recursive_function(rlock, level):
"""Function that acquires lock recursively"""
with rlock:
print(f"Level {level}: acquired lock")
if level > 0:
recursive_function(rlock, level - 1)
print(f"Level {level}: releasing lock")
if __name__ == '__main__':
# Test regular lock
processes = []
for i in range(3):
p = Process(target=worker_with_lock, args=(f"Worker-{i}", 5, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Final counter value: {counter}")
# Test recursive lock
rlock = RLock()
process = Process(target=recursive_function, args=(rlock, 3))
process.start()
process.join()Counting semaphores for controlling access to resources with limited capacity.
class Semaphore:
"""
A counting semaphore.
"""
def __init__(self, value=1, ctx=None):
"""
Create a semaphore.
Parameters:
- value: initial count (default: 1)
- ctx: multiprocessing context
"""
def acquire(self, block=True, timeout=None) -> bool:
"""
Acquire the semaphore, decrementing internal counter.
Parameters:
- block: whether to block if semaphore unavailable
- timeout: timeout in seconds (None for no timeout)
Returns:
True if semaphore acquired, False otherwise
"""
def release(self):
"""
Release the semaphore, incrementing internal counter.
"""
def __enter__(self):
"""Context manager entry."""
return self.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.release()
class BoundedSemaphore(Semaphore):
"""
A bounded semaphore that prevents release() from raising count above initial value.
"""
def release(self):
"""
Release the semaphore, but prevent count from exceeding initial value.
Raises:
- ValueError: if release() would increase count above initial value
"""Usage example:
from billiard import Process, Semaphore, BoundedSemaphore
import time
import random
# Semaphore limiting concurrent access to 3 resources
resource_semaphore = Semaphore(3)
def worker_with_semaphore(worker_id, semaphore):
"""Worker that uses limited resource"""
print(f"Worker {worker_id}: requesting resource...")
with semaphore:
print(f"Worker {worker_id}: acquired resource")
# Simulate work with resource
work_time = random.uniform(0.5, 2.0)
time.sleep(work_time)
print(f"Worker {worker_id}: releasing resource after {work_time:.1f}s")
print(f"Worker {worker_id}: done")
def bounded_semaphore_example():
"""Demonstrate bounded semaphore behavior"""
bounded_sem = BoundedSemaphore(2)
# Acquire twice (should work)
bounded_sem.acquire()
bounded_sem.acquire()
print("Acquired semaphore twice")
# Release twice
bounded_sem.release()
bounded_sem.release()
print("Released semaphore twice")
# Try to release again (should raise ValueError)
try:
bounded_sem.release()
except ValueError as e:
print(f"Bounded semaphore prevented over-release: {e}")
if __name__ == '__main__':
# Test resource limiting
workers = []
for i in range(8):
p = Process(target=worker_with_semaphore, args=(i, resource_semaphore))
workers.append(p)
p.start()
for p in workers:
p.join()
# Test bounded semaphore
bounded_semaphore_example()Simple signaling mechanism for coordinating processes.
class Event:
"""
A simple event object for process synchronization.
"""
def set(self):
"""
Set the internal flag to True.
All processes waiting for it become unblocked.
"""
def clear(self):
"""
Set the internal flag to False.
"""
def is_set(self) -> bool:
"""
Return True if internal flag is True.
Returns:
True if event is set, False otherwise
"""
def wait(self, timeout=None) -> bool:
"""
Block until internal flag is True.
Parameters:
- timeout: timeout in seconds (None for no timeout)
Returns:
True if event was set, False if timeout occurred
"""Usage example:
from billiard import Process, Event
import time
import random
def waiter(event, worker_id):
"""Process that waits for event"""
print(f"Waiter {worker_id}: waiting for event...")
if event.wait(timeout=5):
print(f"Waiter {worker_id}: event received!")
else:
print(f"Waiter {worker_id}: timeout waiting for event")
def setter(event, delay):
"""Process that sets event after delay"""
print(f"Setter: waiting {delay} seconds before setting event")
time.sleep(delay)
print("Setter: setting event")
event.set()
def event_coordination_example():
"""Demonstrate event coordination"""
event = Event()
# Start multiple waiters
waiters = []
for i in range(3):
p = Process(target=waiter, args=(event, i))
waiters.append(p)
p.start()
# Start setter with random delay
delay = random.uniform(1, 3)
setter_process = Process(target=setter, args=(event, delay))
setter_process.start()
# Wait for all processes
setter_process.join()
for p in waiters:
p.join()
print(f"Event is set: {event.is_set()}")
# Clear and test again
event.clear()
print(f"Event after clear: {event.is_set()}")
if __name__ == '__main__':
event_coordination_example()Advanced synchronization allowing processes to wait for specific conditions.
class Condition:
"""
A condition variable for process synchronization.
"""
def __init__(self, lock=None, ctx=None):
"""
Create a condition variable.
Parameters:
- lock: underlying lock (Lock() if None)
- ctx: multiprocessing context
"""
def acquire(self, block=True, timeout=None) -> bool:
"""
Acquire the underlying lock.
Parameters:
- block: whether to block if lock unavailable
- timeout: timeout in seconds (None for no timeout)
Returns:
True if lock acquired, False otherwise
"""
def release(self):
"""
Release the underlying lock.
"""
def wait(self, timeout=None) -> bool:
"""
Wait until notified or timeout.
Must be called with lock held.
Parameters:
- timeout: timeout in seconds (None for no timeout)
Returns:
True if notified, False if timeout
"""
def notify(self, n=1):
"""
Wake up at most n processes waiting on condition.
Must be called with lock held.
Parameters:
- n: number of processes to wake up
"""
def notify_all(self):
"""
Wake up all processes waiting on condition.
Must be called with lock held.
"""
def __enter__(self):
"""Context manager entry."""
return self.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.release()Usage example:
from billiard import Process, Condition
import time
import random
# Shared state
items = []
condition = Condition()
def consumer(consumer_id, condition, items):
"""Consumer that waits for items"""
with condition:
while len(items) == 0:
print(f"Consumer {consumer_id}: waiting for items...")
condition.wait()
item = items.pop(0)
print(f"Consumer {consumer_id}: consumed {item}")
def producer(producer_id, condition, items):
"""Producer that creates items"""
for i in range(3):
item = f"item-{producer_id}-{i}"
time.sleep(random.uniform(0.5, 1.5))
with condition:
items.append(item)
print(f"Producer {producer_id}: produced {item}")
condition.notify() # Wake up one consumer
if __name__ == '__main__':
# Start consumers
consumers = []
for i in range(2):
p = Process(target=consumer, args=(i, condition, items))
consumers.append(p)
p.start()
# Start producers
producers = []
for i in range(2):
p = Process(target=producer, args=(i, condition, items))
producers.append(p)
p.start()
# Wait for completion
for p in producers:
p.join()
# Notify remaining consumers to check final state
with condition:
condition.notify_all()
for p in consumers:
p.join()Synchronization point where processes wait for all participants to arrive.
class Barrier:
"""
A barrier object for synchronizing processes.
"""
def __init__(self, parties, action=None, timeout=None, ctx=None):
"""
Create a barrier.
Parameters:
- parties: number of processes that must call wait()
- action: callable to invoke when barrier is released
- timeout: default timeout for wait()
- ctx: multiprocessing context
"""
def wait(self, timeout=None) -> int:
"""
Wait for all processes to reach barrier.
Parameters:
- timeout: timeout in seconds (None uses barrier default)
Returns:
Index in range(parties) identifying this process
Raises:
- BrokenBarrierError: if barrier is broken or reset while waiting
"""
def reset(self):
"""
Reset barrier to initial state.
Any processes waiting will receive BrokenBarrierError.
"""
def abort(self):
"""
Put barrier in broken state.
Any current or future calls to wait() will raise BrokenBarrierError.
"""
@property
def parties(self) -> int:
"""Number of processes required to trip barrier."""
@property
def n_waiting(self) -> int:
"""Number of processes currently waiting."""
@property
def broken(self) -> bool:
"""True if barrier is broken."""Usage example:
from billiard import Process, Barrier
import time
import random
def barrier_action():
"""Action to perform when all processes reach barrier"""
print("*** All processes synchronized - continuing! ***")
def worker_with_barrier(worker_id, barrier, phase_count):
"""Worker that synchronizes at barrier between phases"""
for phase in range(phase_count):
# Do some work
work_time = random.uniform(0.5, 2.0)
print(f"Worker {worker_id}: working on phase {phase} for {work_time:.1f}s")
time.sleep(work_time)
print(f"Worker {worker_id}: finished phase {phase}, waiting at barrier")
# Wait for all workers to complete this phase
try:
index = barrier.wait(timeout=5)
print(f"Worker {worker_id}: barrier passed (index {index})")
except Exception as e:
print(f"Worker {worker_id}: barrier error: {e}")
break
print(f"Worker {worker_id}: all phases complete")
if __name__ == '__main__':
num_workers = 4
num_phases = 3
# Create barrier for all workers
barrier = Barrier(num_workers, action=barrier_action)
print(f"Barrier created for {barrier.parties} processes")
# Start workers
workers = []
for i in range(num_workers):
p = Process(target=worker_with_barrier, args=(i, barrier, num_phases))
workers.append(p)
p.start()
# Monitor barrier state
for phase in range(num_phases):
time.sleep(1)
print(f"Phase {phase}: {barrier.n_waiting} processes waiting, "
f"broken: {barrier.broken}")
# Wait for all workers to complete
for p in workers:
p.join()
print("All workers completed")with statements) when possible to ensure locks are properly releasedInstall with Tessl CLI
npx tessl i tessl/pypi-billiard