A fork of Python's multiprocessing module that extends multiprocessing to provide enhanced serialization using dill
—
Objects and memory that can be shared between processes. Multiprocess provides both high-level managed objects through Manager and low-level shared memory constructs for different sharing patterns and performance requirements.
High-level interface for creating shared objects managed by a server process.
def Manager():
"""
Create a SyncManager instance for sharing objects between processes.
Returns:
SyncManager: manager instance that creates shared objects
"""class SyncManager:
"""
Manager that provides shared objects via a server process.
"""
def start(self, initializer=None, initargs=()):
"""
Start the manager's server process.
Args:
initializer: callable to run when server starts
initargs: arguments for initializer
"""
def shutdown(self):
"""Shutdown the manager's server process."""
def dict(self, *args, **kwargs):
"""
Create a shared dictionary.
Returns:
DictProxy: proxy to a shared dict object
"""
def list(self, sequence=()):
"""
Create a shared list.
Args:
sequence: initial sequence to populate list
Returns:
ListProxy: proxy to a shared list object
"""
def Namespace(self):
"""
Create a shared namespace object.
Returns:
NamespaceProxy: proxy to a shared namespace
"""
def Lock(self):
"""Create a shared Lock."""
def RLock(self):
"""Create a shared RLock."""
def Semaphore(self, value=1):
"""Create a shared Semaphore."""
def BoundedSemaphore(self, value=1):
"""Create a shared BoundedSemaphore."""
def Condition(self, lock=None):
"""Create a shared Condition."""
def Event(self):
"""Create a shared Event."""
def Barrier(self, parties, action=None, timeout=None):
"""Create a shared Barrier."""
def Queue(self, maxsize=0):
"""Create a shared Queue."""
def JoinableQueue(self, maxsize=0):
"""Create a shared JoinableQueue."""
def Pool(self, processes=None, initializer=None, initargs=()):
"""Create a shared Pool."""
def __enter__(self):
"""Context manager entry - starts manager."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - shuts down manager."""Low-level shared memory constructs for direct memory sharing between processes.
# Import from shared_memory submodule
from multiprocess.shared_memory import SharedMemory, ShareableListclass SharedMemory:
"""
Direct shared memory block accessible across processes.
Args:
name: name of existing shared memory block (None to create new)
create: if True, create new shared memory block
size: size in bytes for new shared memory block
"""
def __init__(self, name=None, create=False, size=0): ...
def close(self):
"""Close access to the shared memory from this instance."""
def unlink(self):
"""
Request deletion of the shared memory block.
Should be called by only one process.
"""
# Properties
name: str # Name of the shared memory block
size: int # Size of the shared memory block in bytes
buf: memoryview # Memory buffer for direct accessclass ShareableList:
"""
List-like object stored in shared memory.
Args:
sequence: initial sequence to populate the list (None for existing)
name: name of existing shareable list (None to create new)
"""
def __init__(self, sequence=None, name=None): ...
def __getitem__(self, index):
"""Get item at index."""
def __setitem__(self, index, value):
"""Set item at index."""
def __len__(self):
"""Return length of list."""
def copy(self):
"""
Return a shallow copy as a regular list.
Returns:
list: copy of the shareable list
"""
def count(self, value):
"""
Return number of occurrences of value.
Args:
value: value to count
Returns:
int: number of occurrences
"""
def index(self, value):
"""
Return index of first occurrence of value.
Args:
value: value to find
Returns:
int: index of value
Raises:
ValueError: if value not found
"""
# Properties
format: str # Format string describing stored types
shm: SharedMemory # Underlying shared memory blockShared memory objects based on ctypes for typed data sharing.
# Import from sharedctypes for utility functions
from multiprocess.sharedctypes import copy, synchronizeddef Value(typecode_or_type, *args, lock=True):
"""
Create a shared ctypes object.
Args:
typecode_or_type: ctypes type or single character type code
args: initial value arguments
lock: if True, create with synchronization lock
Returns:
SynchronizedBase: synchronized shared value
"""
def Array(typecode_or_type, size_or_initializer, lock=True):
"""
Create a shared ctypes array.
Args:
typecode_or_type: ctypes type or single character type code
size_or_initializer: size of array or initial values
lock: if True, create with synchronization lock
Returns:
SynchronizedArray: synchronized shared array
"""
def RawValue(typecode_or_type, *args):
"""
Create an unsynchronized shared ctypes object.
Args:
typecode_or_type: ctypes type or single character type code
args: initial value arguments
Returns:
ctypes object: raw shared value without locking
"""
def RawArray(typecode_or_type, size_or_initializer):
"""
Create an unsynchronized shared ctypes array.
Args:
typecode_or_type: ctypes type or single character type code
size_or_initializer: size of array or initial values
Returns:
ctypes array: raw shared array without locking
"""def copy(obj):
"""
Create a copy of a shared object.
Args:
obj: shared object to copy
Returns:
object: copy of the shared object
"""
def synchronized(obj, lock=None):
"""
Make a shared object thread-safe.
Args:
obj: object to synchronize
lock: lock to use (creates new if None)
Returns:
SynchronizedBase: synchronized wrapper
"""from multiprocess import Process, Manager
def worker(shared_dict, shared_list, worker_id):
# Modify shared dictionary
shared_dict[f'worker_{worker_id}'] = f'Hello from {worker_id}'
# Append to shared list
shared_list.append(f'Item from worker {worker_id}')
print(f"Worker {worker_id} completed")
if __name__ == '__main__':
with Manager() as manager:
# Create shared objects
shared_dict = manager.dict()
shared_list = manager.list()
# Create processes
processes = []
for i in range(3):
p = Process(target=worker, args=(shared_dict, shared_list, i))
p.start()
processes.append(p)
# Wait for completion
for p in processes:
p.join()
print(f"Final dict: {dict(shared_dict)}")
print(f"Final list: {list(shared_list)}")from multiprocess import Process, Manager
def update_namespace(ns, worker_id):
# Access namespace attributes
ns.counter += 1
ns.messages.append(f"Message from worker {worker_id}")
# Create new attributes
setattr(ns, f'worker_{worker_id}_status', 'completed')
if __name__ == '__main__':
with Manager() as manager:
# Create shared namespace
ns = manager.Namespace()
ns.counter = 0
ns.messages = manager.list()
# Create processes
processes = []
for i in range(4):
p = Process(target=update_namespace, args=(ns, i))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Counter: {ns.counter}")
print(f"Messages: {list(ns.messages)}")
# Print all attributes
for attr in dir(ns):
if not attr.startswith('_'):
print(f"{attr}: {getattr(ns, attr)}")from multiprocess import Process
from multiprocess.shared_memory import SharedMemory
import numpy as np
def worker_process(shm_name, shape, dtype):
# Attach to existing shared memory
existing_shm = SharedMemory(name=shm_name)
# Create numpy array from shared memory
array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# Modify the array
array += 10
print(f"Worker modified array: {array}")
# Clean up
existing_shm.close()
if __name__ == '__main__':
# Create data
data = np.array([1, 2, 3, 4, 5], dtype=np.int64)
# Create shared memory
shm = SharedMemory(create=True, size=data.nbytes)
# Copy data to shared memory
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
shared_array[:] = data[:]
print(f"Original array: {shared_array}")
# Create worker process
p = Process(target=worker_process, args=(shm.name, data.shape, data.dtype))
p.start()
p.join()
print(f"Array after worker: {shared_array}")
# Clean up
shm.close()
shm.unlink()from multiprocess import Process
from multiprocess.shared_memory import ShareableList
def list_worker(shared_list_name, worker_id):
# Attach to existing shareable list
shared_list = ShareableList(name=shared_list_name)
# Modify list elements
for i in range(len(shared_list)):
if isinstance(shared_list[i], (int, float)):
shared_list[i] = shared_list[i] + worker_id * 10
print(f"Worker {worker_id} processed list")
# List operations
print(f"List length: {len(shared_list)}")
print(f"Copy of list: {shared_list.copy()}")
if __name__ == '__main__':
# Create shareable list
initial_data = [1, 2, 3, 4, 5]
shared_list = ShareableList(initial_data)
print(f"Initial list: {shared_list.copy()}")
# Create worker processes
processes = []
for i in range(2):
p = Process(target=list_worker, args=(shared_list.shm.name, i + 1))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Final list: {shared_list.copy()}")
# Clean up
shared_list.shm.close()
shared_list.shm.unlink()from multiprocess import Process, Value, Array
import ctypes
def modify_shared_data(shared_value, shared_array):
# Modify shared value
with shared_value.get_lock():
shared_value.value += 10
# Modify shared array
with shared_array.get_lock():
for i in range(len(shared_array)):
shared_array[i] = shared_array[i] * 2
if __name__ == '__main__':
# Create shared value (integer)
shared_int = Value('i', 5) # 'i' = signed int
# Create shared array (floats)
shared_floats = Array('f', [1.0, 2.0, 3.0, 4.0]) # 'f' = float
print(f"Initial value: {shared_int.value}")
print(f"Initial array: {list(shared_floats[:])}")
# Create processes
processes = []
for i in range(2):
p = Process(target=modify_shared_data, args=(shared_int, shared_floats))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Final value: {shared_int.value}")
print(f"Final array: {list(shared_floats[:])}")from multiprocess import Process, RawValue, RawArray, Lock
import time
def worker_with_manual_locking(raw_value, raw_array, lock, worker_id):
for _ in range(5):
# Manual locking for raw shared objects
with lock:
# Modify raw value
old_val = raw_value.value
time.sleep(0.01) # Simulate some work
raw_value.value = old_val + 1
# Modify raw array
for i in range(len(raw_array)):
raw_array[i] += 1
print(f"Worker {worker_id} iteration completed")
time.sleep(0.1)
if __name__ == '__main__':
# Create raw shared objects (no automatic locking)
raw_value = RawValue('i', 0)
raw_array = RawArray('i', [0, 0, 0])
# Create manual lock
lock = Lock()
print(f"Initial value: {raw_value.value}")
print(f"Initial array: {list(raw_array[:])}")
# Create worker processes
processes = []
for i in range(3):
p = Process(target=worker_with_manual_locking,
args=(raw_value, raw_array, lock, i))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Final value: {raw_value.value}")
print(f"Final array: {list(raw_array[:])}")from multiprocess import Process, Manager
import time
def data_processor(shared_data, processor_id):
# Add processed items
for i in range(3):
item = {
'processor_id': processor_id,
'item_number': i,
'processed_at': time.time(),
'data': f"Processed by {processor_id}"
}
shared_data['items'].append(item)
# Update statistics
with shared_data['stats_lock']:
shared_data['stats']['total_processed'] += 3
shared_data['stats']['processors'][processor_id] = True
def monitor_progress(shared_data):
"""Monitor processing progress"""
start_time = time.time()
while True:
time.sleep(0.5)
with shared_data['stats_lock']:
total = shared_data['stats']['total_processed']
active_processors = sum(shared_data['stats']['processors'].values())
elapsed = time.time() - start_time
print(f"Time: {elapsed:.1f}s, Processed: {total}, Active: {active_processors}")
if total >= 9: # 3 processors * 3 items each
break
if __name__ == '__main__':
with Manager() as manager:
# Create complex shared data structure
shared_data = manager.dict({
'items': manager.list(),
'stats': manager.dict({
'total_processed': 0,
'processors': manager.dict({0: False, 1: False, 2: False})
}),
'stats_lock': manager.Lock()
})
# Create processor processes
processors = []
for i in range(3):
p = Process(target=data_processor, args=(shared_data, i))
p.start()
processors.append(p)
# Create monitor process
monitor = Process(target=monitor_progress, args=(shared_data,))
monitor.start()
# Wait for processors to complete
for p in processors:
p.join()
# Wait for monitor
monitor.join()
print(f"\nFinal results:")
print(f"Total items processed: {len(shared_data['items'])}")
print(f"Statistics: {dict(shared_data['stats'])}")from multiprocess import Process, Manager, Value, RawValue, Lock
import time
def test_synchronized_value(shared_val, iterations):
"""Test with automatic synchronization"""
start_time = time.time()
for _ in range(iterations):
with shared_val.get_lock():
shared_val.value += 1
duration = time.time() - start_time
return duration
def test_raw_value(raw_val, lock, iterations):
"""Test with manual synchronization"""
start_time = time.time()
for _ in range(iterations):
with lock:
raw_val.value += 1
duration = time.time() - start_time
return duration
def benchmark_worker(test_type, shared_obj, extra_arg, iterations):
if test_type == 'synchronized':
duration = test_synchronized_value(shared_obj, iterations)
else:
duration = test_raw_value(shared_obj, extra_arg, iterations)
print(f"{test_type} test completed in {duration:.3f} seconds")
if __name__ == '__main__':
iterations = 10000
# Test 1: Synchronized Value
print("Testing synchronized Value...")
sync_val = Value('i', 0)
p1 = Process(target=benchmark_worker,
args=('synchronized', sync_val, None, iterations))
p1.start()
p1.join()
print(f"Synchronized final value: {sync_val.value}")
# Test 2: Raw Value with manual lock
print("\nTesting raw Value with manual lock...")
raw_val = RawValue('i', 0)
manual_lock = Lock()
p2 = Process(target=benchmark_worker,
args=('raw', raw_val, manual_lock, iterations))
p2.start()
p2.join()
print(f"Raw final value: {raw_val.value}")Install with Tessl CLI
npx tessl i tessl/pypi-multiprocess