CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-billiard

Python multiprocessing fork with improvements and bugfixes for distributed task processing

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

managers.mddocs/

Managers

Shared object managers for creating and managing shared objects across multiple processes with proxy-based access and automatic cleanup.

Capabilities

Manager Creation

Create manager instances for sharing objects between processes.

def Manager() -> SyncManager:
    """
    Create a SyncManager instance.
    
    Returns:
    SyncManager object for creating shared objects
    """

Usage example:

from billiard import Process, Manager
import time

def worker_with_manager(shared_dict, shared_list, worker_id):
    """Worker that uses managed objects"""
    # Update shared dictionary
    shared_dict[f'worker_{worker_id}'] = f'Hello from worker {worker_id}'
    
    # Add to shared list
    shared_list.append(f'Item from worker {worker_id}')
    
    print(f"Worker {worker_id}: dict={dict(shared_dict)}")
    print(f"Worker {worker_id}: list={list(shared_list)}")

if __name__ == '__main__':
    # Create manager
    with Manager() as manager:
        # Create shared objects through manager
        shared_dict = manager.dict()
        shared_list = manager.list()
        
        # Initialize shared objects
        shared_dict['initial'] = 'value'
        shared_list.extend([1, 2, 3])
        
        # Start worker processes
        processes = []
        for i in range(3):
            p = Process(target=worker_with_manager, 
                       args=(shared_dict, shared_list, i))
            processes.append(p)
            p.start()
        
        # Wait for completion
        for p in processes:
            p.join()
        
        print(f"Final dict: {dict(shared_dict)}")
        print(f"Final list: {list(shared_list)}")

SyncManager

Manager for synchronization primitives and shared objects.

class SyncManager:
    """
    Manager for shared objects and synchronization primitives.
    """
    def start(self):
        """
        Start the manager process.
        """
    
    def shutdown(self):
        """
        Shutdown the manager process.
        """
    
    def dict(self, *args, **kwargs) -> dict:
        """
        Create a shared dictionary.
        
        Parameters:
        - *args, **kwargs: arguments for dict() constructor
        
        Returns:
        Proxy to shared dictionary
        """
    
    def list(self, sequence=()) -> list:
        """
        Create a shared list.
        
        Parameters:
        - sequence: initial sequence for list
        
        Returns:
        Proxy to shared list
        """
    
    def Namespace(self):
        """
        Create a shared namespace object.
        
        Returns:
        Proxy to shared namespace (object with arbitrary attributes)
        """
    
    def Value(self, typecode, value, lock=True):
        """
        Create a shared Value.
        
        Parameters:
        - typecode: ctypes typecode
        - value: initial value
        - lock: whether to use locking
        
        Returns:
        Proxy to shared value
        """
    
    def Array(self, typecode, sequence, lock=True):
        """
        Create a shared Array.
        
        Parameters:
        - typecode: ctypes typecode
        - sequence: initial sequence or size
        - lock: whether to use locking
        
        Returns:
        Proxy to shared array
        """
    
    def Queue(self, maxsize=0):
        """
        Create a shared Queue.
        
        Parameters:
        - maxsize: maximum queue size
        
        Returns:
        Proxy to shared queue
        """
    
    def JoinableQueue(self, maxsize=0):
        """
        Create a shared JoinableQueue.
        
        Parameters:
        - maxsize: maximum queue size
        
        Returns:
        Proxy to shared joinable queue
        """
    
    def Lock(self):
        """
        Create a shared Lock.
        
        Returns:
        Proxy to shared lock
        """
    
    def RLock(self):
        """
        Create a shared RLock.
        
        Returns:
        Proxy to shared recursive lock
        """
    
    def Semaphore(self, value=1):
        """
        Create a shared Semaphore.
        
        Parameters:
        - value: initial semaphore count
        
        Returns:
        Proxy to shared semaphore
        """
    
    def BoundedSemaphore(self, value=1):
        """
        Create a shared BoundedSemaphore.
        
        Parameters:
        - value: initial semaphore count
        
        Returns:
        Proxy to shared bounded semaphore
        """
    
    def Condition(self, lock=None):
        """
        Create a shared Condition.
        
        Parameters:
        - lock: underlying lock (creates new if None)
        
        Returns:
        Proxy to shared condition variable
        """
    
    def Event(self):
        """
        Create a shared Event.
        
        Returns:
        Proxy to shared event
        """
    
    def Barrier(self, parties, action=None, timeout=None):
        """
        Create a shared Barrier.
        
        Parameters:
        - parties: number of processes needed
        - action: callable to run when barrier releases
        - timeout: default timeout
        
        Returns:
        Proxy to shared barrier
        """

Usage example:

from billiard import Process, Manager
import time
import random

def producer_with_manager(queue, event, stats):
    """Producer using managed objects"""
    for i in range(5):
        item = f"item_{i}"
        queue.put(item)
        stats['produced'] = stats.get('produced', 0) + 1
        print(f"Produced: {item}")
        time.sleep(random.uniform(0.1, 0.5))
    
    # Signal completion
    event.set()

def consumer_with_manager(queue, event, stats, consumer_id):
    """Consumer using managed objects"""
    while True:
        try:
            if not queue.empty():
                item = queue.get_nowait()
                stats[f'consumer_{consumer_id}'] = stats.get(f'consumer_{consumer_id}', 0) + 1
                print(f"Consumer {consumer_id} consumed: {item}")
                time.sleep(0.2)
            elif event.is_set():
                break
            else:
                time.sleep(0.1)
        except:
            time.sleep(0.1)

def manager_coordination_example():
    """Demonstrate manager-based coordination"""
    with Manager() as manager:
        # Create managed objects
        shared_queue = manager.Queue()
        completion_event = manager.Event()
        stats = manager.dict()
        
        # Start processes
        processes = []
        
        # Producer
        prod = Process(target=producer_with_manager, 
                      args=(shared_queue, completion_event, stats))
        processes.append(prod)
        prod.start()
        
        # Consumers
        for i in range(2):
            cons = Process(target=consumer_with_manager,
                          args=(shared_queue, completion_event, stats, i))
            processes.append(cons)
            cons.start()
        
        # Wait for completion
        for p in processes:
            p.join()
        
        print(f"Final stats: {dict(stats)}")

if __name__ == '__main__':
    manager_coordination_example()

Custom Managers

Create custom managers for specialized shared objects.

class BaseManager:
    """
    Base class for creating custom managers.
    """
    def __init__(self, address=None, authkey=None, serializer='pickle'):
        """
        Create a BaseManager.
        
        Parameters:
        - address: address for manager server
        - authkey: authentication key
        - serializer: serialization method
        """
    
    def start(self, initializer=None, initargs=()):
        """
        Start the manager process.
        
        Parameters:
        - initializer: callable to run on manager startup
        - initargs: arguments for initializer
        """
    
    def shutdown(self):
        """
        Shutdown the manager.
        """
    
    @classmethod
    def register(cls, typeid, callable=None, proxytype=None, exposed=None, 
                method_to_typeid=None, create_method=True):
        """
        Register a type with the manager.
        
        Parameters:
        - typeid: string identifier for the type
        - callable: callable that returns the object
        - proxytype: proxy class for the object
        - exposed: list of exposed methods/attributes
        - method_to_typeid: mapping of method names to typeids
        - create_method: whether to create a method on manager
        """

Usage example:

from billiard import Process
from billiard.managers import BaseManager
import time
import threading

# Custom shared object
class Counter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()
    
    def increment(self):
        with self._lock:
            self._value += 1
    
    def decrement(self):
        with self._lock:
            self._value -= 1
    
    def get_value(self):
        with self._lock:
            return self._value

# Custom manager
class CustomManager(BaseManager):
    pass

# Register the Counter class
CustomManager.register('Counter', Counter)

def worker_with_custom_manager(counter, worker_id, operations):
    """Worker using custom managed object"""
    for i in range(operations):
        if i % 2 == 0:
            counter.increment()
            print(f"Worker {worker_id}: incremented to {counter.get_value()}")
        else:
            counter.decrement()
            print(f"Worker {worker_id}: decremented to {counter.get_value()}")
        time.sleep(0.1)

def custom_manager_example():
    """Demonstrate custom manager usage"""
    with CustomManager() as manager:
        # Create custom managed object
        counter = manager.Counter()
        
        print(f"Initial counter value: {counter.get_value()}")
        
        # Start worker processes
        processes = []
        for i in range(3):
            p = Process(target=worker_with_custom_manager,
                       args=(counter, i, 5))
            processes.append(p)
            p.start()
        
        # Wait for completion
        for p in processes:
            p.join()
        
        print(f"Final counter value: {counter.get_value()}")

if __name__ == '__main__':
    custom_manager_example()

Advanced Manager Patterns

Shared Cache with Manager

from billiard import Process, Manager
import time
import random

def cache_worker(cache, lock, worker_id):
    """Worker that uses shared cache"""
    for i in range(5):
        key = f"key_{random.randint(1, 10)}"
        
        # Try to get from cache
        with lock:
            if key in cache:
                value = cache[key]
                print(f"Worker {worker_id}: cache hit for {key} = {value}")
            else:
                # Simulate expensive computation
                value = random.randint(100, 999)
                cache[key] = value
                print(f"Worker {worker_id}: cache miss, computed {key} = {value}")
        
        time.sleep(0.2)

def shared_cache_example():
    """Demonstrate shared cache using manager"""
    with Manager() as manager:
        cache = manager.dict()
        cache_lock = manager.Lock()
        
        # Start workers
        processes = []
        for i in range(4):
            p = Process(target=cache_worker, args=(cache, cache_lock, i))
            processes.append(p)
            p.start()
        
        for p in processes:
            p.join()
        
        print(f"Final cache contents: {dict(cache)}")

if __name__ == '__main__':
    shared_cache_example()

Work Distribution with Manager

from billiard import Process, Manager
import time
import random

def work_distributor(task_queue, result_dict, num_tasks):
    """Distribute tasks to workers"""
    for i in range(num_tasks):
        task = {
            'id': i,
            'data': random.randint(1, 100),
            'operation': random.choice(['square', 'cube', 'double'])
        }
        task_queue.put(task)
    
    # Add termination signals
    for _ in range(3):  # Number of workers
        task_queue.put(None)

def worker_processor(task_queue, result_dict, worker_id):
    """Process tasks from queue"""
    while True:
        task = task_queue.get()
        if task is None:
            break
        
        # Process task
        data = task['data']
        if task['operation'] == 'square':
            result = data ** 2
        elif task['operation'] == 'cube':
            result = data ** 3
        else:  # double
            result = data * 2
        
        result_dict[task['id']] = {
            'input': data,
            'operation': task['operation'],
            'result': result,
            'worker': worker_id
        }
        
        print(f"Worker {worker_id}: processed task {task['id']}")
        time.sleep(0.1)

def work_distribution_example():
    """Demonstrate work distribution pattern"""
    with Manager() as manager:
        task_queue = manager.Queue()
        results = manager.dict()
        
        # Start distributor
        distributor = Process(target=work_distributor, 
                            args=(task_queue, results, 15))
        distributor.start()
        
        # Start workers
        workers = []
        for i in range(3):
            worker = Process(target=worker_processor,
                           args=(task_queue, results, i))
            workers.append(worker)
            worker.start()
        
        # Wait for completion
        distributor.join()
        for worker in workers:
            worker.join()
        
        # Display results
        print(f"Processed {len(results)} tasks:")
        for task_id, result in sorted(results.items()):
            print(f"Task {task_id}: {result['input']} {result['operation']} = "
                  f"{result['result']} (worker {result['worker']})")

if __name__ == '__main__':
    work_distribution_example()

Manager Best Practices

  1. Use context managers (with Manager() as manager:) for automatic cleanup
  2. Minimize proxy method calls - cache frequently accessed values locally
  3. Use appropriate data structures - manager objects have method call overhead
  4. Consider alternatives for high-performance scenarios (shared memory arrays)
  5. Handle manager failures - managers run in separate processes and can fail
  6. Serialize access to shared objects when needed using manager locks

Performance Considerations

  • Method call overhead: Each proxy method call involves IPC
  • Serialization cost: Objects are pickled/unpickled for transfer
  • Network latency: Managers can run on remote machines
  • Memory usage: Objects stored in manager process memory
  • Garbage collection: Managed objects persist until manager shutdown

Install with Tessl CLI

npx tessl i tessl/pypi-billiard

docs

communication.md

context-management.md

index.md

managers.md

process-management.md

process-pools.md

queues.md

shared-memory.md

synchronization.md

tile.json