Python multiprocessing fork with improvements and bugfixes for distributed task processing
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Shared object managers for creating and managing shared objects across multiple processes with proxy-based access and automatic cleanup.
Create manager instances for sharing objects between processes.
def Manager() -> SyncManager:
"""
Create a SyncManager instance.
Returns:
SyncManager object for creating shared objects
"""Usage example:
from billiard import Process, Manager
import time
def worker_with_manager(shared_dict, shared_list, worker_id):
"""Worker that uses managed objects"""
# Update shared dictionary
shared_dict[f'worker_{worker_id}'] = f'Hello from worker {worker_id}'
# Add to shared list
shared_list.append(f'Item from worker {worker_id}')
print(f"Worker {worker_id}: dict={dict(shared_dict)}")
print(f"Worker {worker_id}: list={list(shared_list)}")
if __name__ == '__main__':
# Create manager
with Manager() as manager:
# Create shared objects through manager
shared_dict = manager.dict()
shared_list = manager.list()
# Initialize shared objects
shared_dict['initial'] = 'value'
shared_list.extend([1, 2, 3])
# Start worker processes
processes = []
for i in range(3):
p = Process(target=worker_with_manager,
args=(shared_dict, shared_list, i))
processes.append(p)
p.start()
# Wait for completion
for p in processes:
p.join()
print(f"Final dict: {dict(shared_dict)}")
print(f"Final list: {list(shared_list)}")Manager for synchronization primitives and shared objects.
class SyncManager:
"""
Manager for shared objects and synchronization primitives.
"""
def start(self):
"""
Start the manager process.
"""
def shutdown(self):
"""
Shutdown the manager process.
"""
def dict(self, *args, **kwargs) -> dict:
"""
Create a shared dictionary.
Parameters:
- *args, **kwargs: arguments for dict() constructor
Returns:
Proxy to shared dictionary
"""
def list(self, sequence=()) -> list:
"""
Create a shared list.
Parameters:
- sequence: initial sequence for list
Returns:
Proxy to shared list
"""
def Namespace(self):
"""
Create a shared namespace object.
Returns:
Proxy to shared namespace (object with arbitrary attributes)
"""
def Value(self, typecode, value, lock=True):
"""
Create a shared Value.
Parameters:
- typecode: ctypes typecode
- value: initial value
- lock: whether to use locking
Returns:
Proxy to shared value
"""
def Array(self, typecode, sequence, lock=True):
"""
Create a shared Array.
Parameters:
- typecode: ctypes typecode
- sequence: initial sequence or size
- lock: whether to use locking
Returns:
Proxy to shared array
"""
def Queue(self, maxsize=0):
"""
Create a shared Queue.
Parameters:
- maxsize: maximum queue size
Returns:
Proxy to shared queue
"""
def JoinableQueue(self, maxsize=0):
"""
Create a shared JoinableQueue.
Parameters:
- maxsize: maximum queue size
Returns:
Proxy to shared joinable queue
"""
def Lock(self):
"""
Create a shared Lock.
Returns:
Proxy to shared lock
"""
def RLock(self):
"""
Create a shared RLock.
Returns:
Proxy to shared recursive lock
"""
def Semaphore(self, value=1):
"""
Create a shared Semaphore.
Parameters:
- value: initial semaphore count
Returns:
Proxy to shared semaphore
"""
def BoundedSemaphore(self, value=1):
"""
Create a shared BoundedSemaphore.
Parameters:
- value: initial semaphore count
Returns:
Proxy to shared bounded semaphore
"""
def Condition(self, lock=None):
"""
Create a shared Condition.
Parameters:
- lock: underlying lock (creates new if None)
Returns:
Proxy to shared condition variable
"""
def Event(self):
"""
Create a shared Event.
Returns:
Proxy to shared event
"""
def Barrier(self, parties, action=None, timeout=None):
"""
Create a shared Barrier.
Parameters:
- parties: number of processes needed
- action: callable to run when barrier releases
- timeout: default timeout
Returns:
Proxy to shared barrier
"""Usage example:
from billiard import Process, Manager
import time
import random
def producer_with_manager(queue, event, stats):
"""Producer using managed objects"""
for i in range(5):
item = f"item_{i}"
queue.put(item)
stats['produced'] = stats.get('produced', 0) + 1
print(f"Produced: {item}")
time.sleep(random.uniform(0.1, 0.5))
# Signal completion
event.set()
def consumer_with_manager(queue, event, stats, consumer_id):
"""Consumer using managed objects"""
while True:
try:
if not queue.empty():
item = queue.get_nowait()
stats[f'consumer_{consumer_id}'] = stats.get(f'consumer_{consumer_id}', 0) + 1
print(f"Consumer {consumer_id} consumed: {item}")
time.sleep(0.2)
elif event.is_set():
break
else:
time.sleep(0.1)
except:
time.sleep(0.1)
def manager_coordination_example():
"""Demonstrate manager-based coordination"""
with Manager() as manager:
# Create managed objects
shared_queue = manager.Queue()
completion_event = manager.Event()
stats = manager.dict()
# Start processes
processes = []
# Producer
prod = Process(target=producer_with_manager,
args=(shared_queue, completion_event, stats))
processes.append(prod)
prod.start()
# Consumers
for i in range(2):
cons = Process(target=consumer_with_manager,
args=(shared_queue, completion_event, stats, i))
processes.append(cons)
cons.start()
# Wait for completion
for p in processes:
p.join()
print(f"Final stats: {dict(stats)}")
if __name__ == '__main__':
manager_coordination_example()Create custom managers for specialized shared objects.
class BaseManager:
"""
Base class for creating custom managers.
"""
def __init__(self, address=None, authkey=None, serializer='pickle'):
"""
Create a BaseManager.
Parameters:
- address: address for manager server
- authkey: authentication key
- serializer: serialization method
"""
def start(self, initializer=None, initargs=()):
"""
Start the manager process.
Parameters:
- initializer: callable to run on manager startup
- initargs: arguments for initializer
"""
def shutdown(self):
"""
Shutdown the manager.
"""
@classmethod
def register(cls, typeid, callable=None, proxytype=None, exposed=None,
method_to_typeid=None, create_method=True):
"""
Register a type with the manager.
Parameters:
- typeid: string identifier for the type
- callable: callable that returns the object
- proxytype: proxy class for the object
- exposed: list of exposed methods/attributes
- method_to_typeid: mapping of method names to typeids
- create_method: whether to create a method on manager
"""Usage example:
from billiard import Process
from billiard.managers import BaseManager
import time
import threading
# Custom shared object
class Counter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
def decrement(self):
with self._lock:
self._value -= 1
def get_value(self):
with self._lock:
return self._value
# Custom manager
class CustomManager(BaseManager):
pass
# Register the Counter class
CustomManager.register('Counter', Counter)
def worker_with_custom_manager(counter, worker_id, operations):
"""Worker using custom managed object"""
for i in range(operations):
if i % 2 == 0:
counter.increment()
print(f"Worker {worker_id}: incremented to {counter.get_value()}")
else:
counter.decrement()
print(f"Worker {worker_id}: decremented to {counter.get_value()}")
time.sleep(0.1)
def custom_manager_example():
"""Demonstrate custom manager usage"""
with CustomManager() as manager:
# Create custom managed object
counter = manager.Counter()
print(f"Initial counter value: {counter.get_value()}")
# Start worker processes
processes = []
for i in range(3):
p = Process(target=worker_with_custom_manager,
args=(counter, i, 5))
processes.append(p)
p.start()
# Wait for completion
for p in processes:
p.join()
print(f"Final counter value: {counter.get_value()}")
if __name__ == '__main__':
custom_manager_example()from billiard import Process, Manager
import time
import random
def cache_worker(cache, lock, worker_id):
"""Worker that uses shared cache"""
for i in range(5):
key = f"key_{random.randint(1, 10)}"
# Try to get from cache
with lock:
if key in cache:
value = cache[key]
print(f"Worker {worker_id}: cache hit for {key} = {value}")
else:
# Simulate expensive computation
value = random.randint(100, 999)
cache[key] = value
print(f"Worker {worker_id}: cache miss, computed {key} = {value}")
time.sleep(0.2)
def shared_cache_example():
"""Demonstrate shared cache using manager"""
with Manager() as manager:
cache = manager.dict()
cache_lock = manager.Lock()
# Start workers
processes = []
for i in range(4):
p = Process(target=cache_worker, args=(cache, cache_lock, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Final cache contents: {dict(cache)}")
if __name__ == '__main__':
shared_cache_example()from billiard import Process, Manager
import time
import random
def work_distributor(task_queue, result_dict, num_tasks):
"""Distribute tasks to workers"""
for i in range(num_tasks):
task = {
'id': i,
'data': random.randint(1, 100),
'operation': random.choice(['square', 'cube', 'double'])
}
task_queue.put(task)
# Add termination signals
for _ in range(3): # Number of workers
task_queue.put(None)
def worker_processor(task_queue, result_dict, worker_id):
"""Process tasks from queue"""
while True:
task = task_queue.get()
if task is None:
break
# Process task
data = task['data']
if task['operation'] == 'square':
result = data ** 2
elif task['operation'] == 'cube':
result = data ** 3
else: # double
result = data * 2
result_dict[task['id']] = {
'input': data,
'operation': task['operation'],
'result': result,
'worker': worker_id
}
print(f"Worker {worker_id}: processed task {task['id']}")
time.sleep(0.1)
def work_distribution_example():
"""Demonstrate work distribution pattern"""
with Manager() as manager:
task_queue = manager.Queue()
results = manager.dict()
# Start distributor
distributor = Process(target=work_distributor,
args=(task_queue, results, 15))
distributor.start()
# Start workers
workers = []
for i in range(3):
worker = Process(target=worker_processor,
args=(task_queue, results, i))
workers.append(worker)
worker.start()
# Wait for completion
distributor.join()
for worker in workers:
worker.join()
# Display results
print(f"Processed {len(results)} tasks:")
for task_id, result in sorted(results.items()):
print(f"Task {task_id}: {result['input']} {result['operation']} = "
f"{result['result']} (worker {result['worker']})")
if __name__ == '__main__':
work_distribution_example()with Manager() as manager:) for automatic cleanupInstall with Tessl CLI
npx tessl i tessl/pypi-billiard