CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-multiprocess

A fork of Python's multiprocessing module that extends multiprocessing to provide enhanced serialization using dill

Pending
Overview
Eval results
Files

shared-objects.mddocs/

Shared Objects and Memory

Objects and memory that can be shared between processes. Multiprocess provides both high-level managed objects through Manager and low-level shared memory constructs for different sharing patterns and performance requirements.

Capabilities

Manager Objects

High-level interface for creating shared objects managed by a server process.

def Manager():
    """
    Create a SyncManager instance for sharing objects between processes.
    
    Returns:
        SyncManager: manager instance that creates shared objects
    """

SyncManager Class

class SyncManager:
    """
    Manager that provides shared objects via a server process.
    """
    def start(self, initializer=None, initargs=()):
        """
        Start the manager's server process.
        
        Args:
            initializer: callable to run when server starts
            initargs: arguments for initializer
        """
    
    def shutdown(self):
        """Shutdown the manager's server process."""
    
    def dict(self, *args, **kwargs):
        """
        Create a shared dictionary.
        
        Returns:
            DictProxy: proxy to a shared dict object
        """
    
    def list(self, sequence=()):
        """
        Create a shared list.
        
        Args:
            sequence: initial sequence to populate list
            
        Returns:
            ListProxy: proxy to a shared list object
        """
    
    def Namespace(self):
        """
        Create a shared namespace object.
        
        Returns:
            NamespaceProxy: proxy to a shared namespace
        """
    
    def Lock(self):
        """Create a shared Lock."""
    
    def RLock(self):
        """Create a shared RLock."""
    
    def Semaphore(self, value=1):
        """Create a shared Semaphore."""
    
    def BoundedSemaphore(self, value=1):
        """Create a shared BoundedSemaphore."""
    
    def Condition(self, lock=None):
        """Create a shared Condition."""
    
    def Event(self):
        """Create a shared Event."""
    
    def Barrier(self, parties, action=None, timeout=None):
        """Create a shared Barrier."""
    
    def Queue(self, maxsize=0):
        """Create a shared Queue."""
    
    def JoinableQueue(self, maxsize=0):
        """Create a shared JoinableQueue."""
    
    def Pool(self, processes=None, initializer=None, initargs=()):
        """Create a shared Pool."""
    
    def __enter__(self):
        """Context manager entry - starts manager."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit - shuts down manager."""

Shared Memory Objects

Low-level shared memory constructs for direct memory sharing between processes.

# Import from shared_memory submodule
from multiprocess.shared_memory import SharedMemory, ShareableList
class SharedMemory:
    """
    Direct shared memory block accessible across processes.
    
    Args:
        name: name of existing shared memory block (None to create new)
        create: if True, create new shared memory block
        size: size in bytes for new shared memory block
    """
    def __init__(self, name=None, create=False, size=0): ...
    
    def close(self):
        """Close access to the shared memory from this instance."""
    
    def unlink(self):
        """
        Request deletion of the shared memory block.
        Should be called by only one process.
        """
    
    # Properties
    name: str    # Name of the shared memory block
    size: int    # Size of the shared memory block in bytes
    buf: memoryview  # Memory buffer for direct access
class ShareableList:
    """
    List-like object stored in shared memory.
    
    Args:
        sequence: initial sequence to populate the list (None for existing)
        name: name of existing shareable list (None to create new)
    """
    def __init__(self, sequence=None, name=None): ...
    
    def __getitem__(self, index):
        """Get item at index."""
    
    def __setitem__(self, index, value):
        """Set item at index."""
    
    def __len__(self):
        """Return length of list."""
    
    def copy(self):
        """
        Return a shallow copy as a regular list.
        
        Returns:
            list: copy of the shareable list
        """
    
    def count(self, value):
        """
        Return number of occurrences of value.
        
        Args:
            value: value to count
            
        Returns:
            int: number of occurrences
        """
    
    def index(self, value):
        """
        Return index of first occurrence of value.
        
        Args:
            value: value to find
            
        Returns:
            int: index of value
            
        Raises:
            ValueError: if value not found
        """
    
    # Properties
    format: str  # Format string describing stored types
    shm: SharedMemory  # Underlying shared memory block

Shared ctypes Objects

Shared memory objects based on ctypes for typed data sharing.

# Import from sharedctypes for utility functions
from multiprocess.sharedctypes import copy, synchronized
def Value(typecode_or_type, *args, lock=True):
    """
    Create a shared ctypes object.
    
    Args:
        typecode_or_type: ctypes type or single character type code
        args: initial value arguments
        lock: if True, create with synchronization lock
        
    Returns:
        SynchronizedBase: synchronized shared value
    """

def Array(typecode_or_type, size_or_initializer, lock=True):
    """
    Create a shared ctypes array.
    
    Args:
        typecode_or_type: ctypes type or single character type code
        size_or_initializer: size of array or initial values
        lock: if True, create with synchronization lock
        
    Returns:
        SynchronizedArray: synchronized shared array
    """

def RawValue(typecode_or_type, *args):
    """
    Create an unsynchronized shared ctypes object.
    
    Args:
        typecode_or_type: ctypes type or single character type code
        args: initial value arguments
        
    Returns:
        ctypes object: raw shared value without locking
    """

def RawArray(typecode_or_type, size_or_initializer):
    """
    Create an unsynchronized shared ctypes array.
    
    Args:
        typecode_or_type: ctypes type or single character type code
        size_or_initializer: size of array or initial values
        
    Returns:
        ctypes array: raw shared array without locking
    """

Utility Functions

def copy(obj):
    """
    Create a copy of a shared object.
    
    Args:
        obj: shared object to copy
        
    Returns:
        object: copy of the shared object
    """

def synchronized(obj, lock=None):
    """
    Make a shared object thread-safe.
    
    Args:
        obj: object to synchronize
        lock: lock to use (creates new if None)
        
    Returns:
        SynchronizedBase: synchronized wrapper
    """

Usage Examples

Basic Manager Usage

from multiprocess import Process, Manager

def worker(shared_dict, shared_list, worker_id):
    # Modify shared dictionary
    shared_dict[f'worker_{worker_id}'] = f'Hello from {worker_id}'
    
    # Append to shared list
    shared_list.append(f'Item from worker {worker_id}')
    
    print(f"Worker {worker_id} completed")

if __name__ == '__main__':
    with Manager() as manager:
        # Create shared objects
        shared_dict = manager.dict()
        shared_list = manager.list()
        
        # Create processes
        processes = []
        for i in range(3):
            p = Process(target=worker, args=(shared_dict, shared_list, i))
            p.start()
            processes.append(p)
        
        # Wait for completion
        for p in processes:
            p.join()
        
        print(f"Final dict: {dict(shared_dict)}")
        print(f"Final list: {list(shared_list)}")

Shared Namespace

from multiprocess import Process, Manager

def update_namespace(ns, worker_id):
    # Access namespace attributes
    ns.counter += 1
    ns.messages.append(f"Message from worker {worker_id}")
    
    # Create new attributes
    setattr(ns, f'worker_{worker_id}_status', 'completed')

if __name__ == '__main__':
    with Manager() as manager:
        # Create shared namespace
        ns = manager.Namespace()
        ns.counter = 0
        ns.messages = manager.list()
        
        # Create processes
        processes = []
        for i in range(4):
            p = Process(target=update_namespace, args=(ns, i))
            p.start()
            processes.append(p)
        
        for p in processes:
            p.join()
        
        print(f"Counter: {ns.counter}")
        print(f"Messages: {list(ns.messages)}")
        
        # Print all attributes
        for attr in dir(ns):
            if not attr.startswith('_'):
                print(f"{attr}: {getattr(ns, attr)}")

SharedMemory Direct Access

from multiprocess import Process
from multiprocess.shared_memory import SharedMemory
import numpy as np

def worker_process(shm_name, shape, dtype):
    # Attach to existing shared memory
    existing_shm = SharedMemory(name=shm_name)
    
    # Create numpy array from shared memory
    array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
    
    # Modify the array
    array += 10
    print(f"Worker modified array: {array}")
    
    # Clean up
    existing_shm.close()

if __name__ == '__main__':
    # Create data
    data = np.array([1, 2, 3, 4, 5], dtype=np.int64)
    
    # Create shared memory
    shm = SharedMemory(create=True, size=data.nbytes)
    
    # Copy data to shared memory
    shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
    shared_array[:] = data[:]
    
    print(f"Original array: {shared_array}")
    
    # Create worker process
    p = Process(target=worker_process, args=(shm.name, data.shape, data.dtype))
    p.start()
    p.join()
    
    print(f"Array after worker: {shared_array}")
    
    # Clean up
    shm.close()
    shm.unlink()

ShareableList Usage

from multiprocess import Process
from multiprocess.shared_memory import ShareableList

def list_worker(shared_list_name, worker_id):
    # Attach to existing shareable list
    shared_list = ShareableList(name=shared_list_name)
    
    # Modify list elements
    for i in range(len(shared_list)):
        if isinstance(shared_list[i], (int, float)):
            shared_list[i] = shared_list[i] + worker_id * 10
    
    print(f"Worker {worker_id} processed list")
    
    # List operations
    print(f"List length: {len(shared_list)}")
    print(f"Copy of list: {shared_list.copy()}")

if __name__ == '__main__':
    # Create shareable list
    initial_data = [1, 2, 3, 4, 5]
    shared_list = ShareableList(initial_data)
    
    print(f"Initial list: {shared_list.copy()}")
    
    # Create worker processes
    processes = []
    for i in range(2):
        p = Process(target=list_worker, args=(shared_list.shm.name, i + 1))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    print(f"Final list: {shared_list.copy()}")
    
    # Clean up
    shared_list.shm.close()
    shared_list.shm.unlink()

ctypes Shared Values

from multiprocess import Process, Value, Array
import ctypes

def modify_shared_data(shared_value, shared_array):
    # Modify shared value
    with shared_value.get_lock():
        shared_value.value += 10
    
    # Modify shared array
    with shared_array.get_lock():
        for i in range(len(shared_array)):
            shared_array[i] = shared_array[i] * 2

if __name__ == '__main__':
    # Create shared value (integer)
    shared_int = Value('i', 5)  # 'i' = signed int
    
    # Create shared array (floats)
    shared_floats = Array('f', [1.0, 2.0, 3.0, 4.0])  # 'f' = float
    
    print(f"Initial value: {shared_int.value}")
    print(f"Initial array: {list(shared_floats[:])}")
    
    # Create processes
    processes = []
    for i in range(2):
        p = Process(target=modify_shared_data, args=(shared_int, shared_floats))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    print(f"Final value: {shared_int.value}")
    print(f"Final array: {list(shared_floats[:])}")

Raw Shared Objects (No Locking)

from multiprocess import Process, RawValue, RawArray, Lock
import time

def worker_with_manual_locking(raw_value, raw_array, lock, worker_id):
    for _ in range(5):
        # Manual locking for raw shared objects
        with lock:
            # Modify raw value
            old_val = raw_value.value
            time.sleep(0.01)  # Simulate some work
            raw_value.value = old_val + 1
            
            # Modify raw array
            for i in range(len(raw_array)):
                raw_array[i] += 1
        
        print(f"Worker {worker_id} iteration completed")
        time.sleep(0.1)

if __name__ == '__main__':
    # Create raw shared objects (no automatic locking)
    raw_value = RawValue('i', 0)
    raw_array = RawArray('i', [0, 0, 0])
    
    # Create manual lock
    lock = Lock()
    
    print(f"Initial value: {raw_value.value}")
    print(f"Initial array: {list(raw_array[:])}")
    
    # Create worker processes
    processes = []
    for i in range(3):
        p = Process(target=worker_with_manual_locking, 
                   args=(raw_value, raw_array, lock, i))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    print(f"Final value: {raw_value.value}")
    print(f"Final array: {list(raw_array[:])}")

Complex Shared Data Structures

from multiprocess import Process, Manager
import time

def data_processor(shared_data, processor_id):
    # Add processed items
    for i in range(3):
        item = {
            'processor_id': processor_id,
            'item_number': i,
            'processed_at': time.time(),
            'data': f"Processed by {processor_id}"
        }
        shared_data['items'].append(item)
    
    # Update statistics
    with shared_data['stats_lock']:
        shared_data['stats']['total_processed'] += 3
        shared_data['stats']['processors'][processor_id] = True

def monitor_progress(shared_data):
    """Monitor processing progress"""
    start_time = time.time()
    while True:
        time.sleep(0.5)
        with shared_data['stats_lock']:
            total = shared_data['stats']['total_processed']
            active_processors = sum(shared_data['stats']['processors'].values())
            
        elapsed = time.time() - start_time
        print(f"Time: {elapsed:.1f}s, Processed: {total}, Active: {active_processors}")
        
        if total >= 9:  # 3 processors * 3 items each
            break

if __name__ == '__main__':
    with Manager() as manager:
        # Create complex shared data structure
        shared_data = manager.dict({
            'items': manager.list(),
            'stats': manager.dict({
                'total_processed': 0,
                'processors': manager.dict({0: False, 1: False, 2: False})
            }),
            'stats_lock': manager.Lock()
        })
        
        # Create processor processes
        processors = []
        for i in range(3):
            p = Process(target=data_processor, args=(shared_data, i))
            p.start()
            processors.append(p)
        
        # Create monitor process
        monitor = Process(target=monitor_progress, args=(shared_data,))
        monitor.start()
        
        # Wait for processors to complete
        for p in processors:
            p.join()
        
        # Wait for monitor
        monitor.join()
        
        print(f"\nFinal results:")
        print(f"Total items processed: {len(shared_data['items'])}")
        print(f"Statistics: {dict(shared_data['stats'])}")

Performance Comparison

from multiprocess import Process, Manager, Value, RawValue, Lock
import time

def test_synchronized_value(shared_val, iterations):
    """Test with automatic synchronization"""
    start_time = time.time()
    for _ in range(iterations):
        with shared_val.get_lock():
            shared_val.value += 1
    duration = time.time() - start_time
    return duration

def test_raw_value(raw_val, lock, iterations):
    """Test with manual synchronization"""
    start_time = time.time()
    for _ in range(iterations):
        with lock:
            raw_val.value += 1
    duration = time.time() - start_time
    return duration

def benchmark_worker(test_type, shared_obj, extra_arg, iterations):
    if test_type == 'synchronized':
        duration = test_synchronized_value(shared_obj, iterations)
    else:
        duration = test_raw_value(shared_obj, extra_arg, iterations)
    print(f"{test_type} test completed in {duration:.3f} seconds")

if __name__ == '__main__':
    iterations = 10000
    
    # Test 1: Synchronized Value
    print("Testing synchronized Value...")
    sync_val = Value('i', 0)
    p1 = Process(target=benchmark_worker, 
                args=('synchronized', sync_val, None, iterations))
    p1.start()
    p1.join()
    print(f"Synchronized final value: {sync_val.value}")
    
    # Test 2: Raw Value with manual lock
    print("\nTesting raw Value with manual lock...")
    raw_val = RawValue('i', 0)
    manual_lock = Lock()
    p2 = Process(target=benchmark_worker, 
                args=('raw', raw_val, manual_lock, iterations))
    p2.start()
    p2.join()
    print(f"Raw final value: {raw_val.value}")

Install with Tessl CLI

npx tessl i tessl/pypi-multiprocess

docs

communication.md

context-config.md

index.md

pools.md

process-management.md

shared-objects.md

synchronization.md

tile.json