Python multiprocessing fork with improvements and bugfixes for distributed task processing
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Synchronized and unsynchronized shared memory objects for efficient data sharing between processes using ctypes-based values and arrays.
Create shared values that can be accessed by multiple processes with optional synchronization.
def Value(typecode_or_type, *args, lock=True, ctx=None):
"""
Create a synchronized shared ctypes value.
Parameters:
- typecode_or_type: ctypes type or single character typecode
- *args: initialization arguments for the value
- lock: if True (default), operations are synchronized; if False, unsynchronized
- ctx: multiprocessing context
Returns:
SynchronizedBase wrapper around ctypes value
"""
def RawValue(typecode_or_type, *args):
"""
Create an unsynchronized shared ctypes value.
Parameters:
- typecode_or_type: ctypes type or single character typecode
- *args: initialization arguments for the value
Returns:
Raw ctypes value (no synchronization)
"""Common typecodes:
'i' - signed int'f' - float'd' - double'c' - char'b' - signed char'B' - unsigned char'h' - short'l' - longUsage example:
from billiard import Process, Value, RawValue
import time
import ctypes
def worker_with_shared_value(shared_counter, worker_id, iterations):
"""Worker that increments shared counter"""
for i in range(iterations):
with shared_counter.get_lock():
old_value = shared_counter.value
time.sleep(0.001) # Simulate some work
shared_counter.value = old_value + 1
print(f"Worker {worker_id}: counter = {shared_counter.value}")
def raw_value_worker(raw_val, worker_id):
"""Worker using unsynchronized raw value (unsafe!)"""
for i in range(5):
raw_val.value += 1
print(f"Worker {worker_id}: raw value = {raw_val.value}")
time.sleep(0.1)
if __name__ == '__main__':
# Synchronized shared value
counter = Value('i', 0) # Integer initialized to 0
print(f"Initial counter value: {counter.value}")
# Start workers that safely increment counter
processes = []
for i in range(3):
p = Process(target=worker_with_shared_value, args=(counter, i, 5))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Final counter value: {counter.value}")
# Demonstrate ctypes usage
float_value = Value(ctypes.c_double, 3.14159)
print(f"Float value: {float_value.value}")
# Raw value (no synchronization)
raw_counter = RawValue('i', 0)
# Start workers with raw value (potential race conditions)
raw_processes = []
for i in range(2):
p = Process(target=raw_value_worker, args=(raw_counter, i))
raw_processes.append(p)
p.start()
for p in raw_processes:
p.join()
print(f"Final raw counter: {raw_counter.value}")Create shared arrays that can be accessed by multiple processes with optional synchronization.
def Array(typecode_or_type, size_or_initializer, lock=True, ctx=None):
"""
Create a synchronized shared ctypes array.
Parameters:
- typecode_or_type: ctypes type or single character typecode
- size_or_initializer: array size (int) or sequence to initialize from
- lock: if True (default), operations are synchronized; if False, unsynchronized
- ctx: multiprocessing context
Returns:
SynchronizedArray wrapper around ctypes array
"""
def RawArray(typecode_or_type, size_or_initializer):
"""
Create an unsynchronized shared ctypes array.
Parameters:
- typecode_or_type: ctypes type or single character typecode
- size_or_initializer: array size (int) or sequence to initialize from
Returns:
Raw ctypes array (no synchronization)
"""Usage example:
from billiard import Process, Array, RawArray
import time
def array_worker(shared_array, worker_id, start_idx, count):
"""Worker that modifies part of shared array"""
with shared_array.get_lock():
for i in range(count):
idx = start_idx + i
if idx < len(shared_array):
shared_array[idx] = worker_id * 100 + i
print(f"Worker {worker_id}: set array[{idx}] = {shared_array[idx]}")
time.sleep(0.1)
def array_reader(shared_array):
"""Process that reads from shared array"""
time.sleep(1) # Let writers work first
with shared_array.get_lock():
print("Array contents:", list(shared_array[:]))
print("Array sum:", sum(shared_array))
def matrix_worker(matrix, row, cols):
"""Worker that processes a row of 2D array"""
for col in range(cols):
idx = row * cols + col
matrix[idx] = row * cols + col + 1 # Fill with sequential values
time.sleep(0.05)
if __name__ == '__main__':
# Create synchronized shared array
shared_arr = Array('i', 10) # Integer array of size 10
print(f"Initial array: {list(shared_arr[:])}")
# Start workers to modify different parts of array
processes = []
for i in range(3):
start = i * 3
p = Process(target=array_worker, args=(shared_arr, i, start, 3))
processes.append(p)
p.start()
# Start reader process
reader_proc = Process(target=array_reader, args=(shared_arr,))
reader_proc.start()
processes.append(reader_proc)
for p in processes:
p.join()
# Array from initializer
init_data = [1, 2, 3, 4, 5]
initialized_array = Array('i', init_data)
print(f"Initialized array: {list(initialized_array[:])}")
# 2D array simulation (flattened)
rows, cols = 3, 4
matrix = Array('i', rows * cols)
# Process each row in parallel
matrix_procs = []
for row in range(rows):
p = Process(target=matrix_worker, args=(matrix, row, cols))
matrix_procs.append(p)
p.start()
for p in matrix_procs:
p.join()
# Print matrix
print("Matrix:")
for row in range(rows):
row_data = []
for col in range(cols):
row_data.append(matrix[row * cols + col])
print(row_data)Additional functions for working with shared memory objects.
def copy(obj):
"""
Create a copy of a shared object.
Parameters:
- obj: shared object to copy
Returns:
Copy of the shared object
"""
def synchronized(obj, lock=None, ctx=None):
"""
Add synchronization wrapper to an object.
Parameters:
- obj: object to wrap
- lock: lock to use (creates new Lock if None)
- ctx: multiprocessing context
Returns:
Synchronized wrapper around object
"""Usage example:
from billiard import Process, RawArray, Lock
from billiard.sharedctypes import synchronized, copy
import time
def synchronized_access_example():
"""Demonstrate adding synchronization to raw shared object"""
# Create raw array (no built-in synchronization)
raw_arr = RawArray('i', [0] * 10)
# Add synchronization wrapper
sync_arr = synchronized(raw_arr)
def sync_worker(arr, worker_id):
with arr.get_lock():
for i in range(len(arr)):
arr[i] += worker_id
time.sleep(0.01)
print(f"Worker {worker_id} completed")
# Use synchronized array
processes = []
for i in range(1, 4):
p = Process(target=sync_worker, args=(sync_arr, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Final array: {list(sync_arr[:])}")
# Copy shared object
arr_copy = copy(sync_arr)
print(f"Copied array: {list(arr_copy[:])}")
if __name__ == '__main__':
synchronized_access_example()from billiard import Process, Array, Value
import time
class CircularBuffer:
def __init__(self, size):
self.buffer = Array('i', size)
self.size = size
self.head = Value('i', 0)
self.tail = Value('i', 0)
self.count = Value('i', 0)
def put(self, item):
with self.buffer.get_lock():
if self.count.value < self.size:
self.buffer[self.tail.value] = item
self.tail.value = (self.tail.value + 1) % self.size
self.count.value += 1
return True
return False # Buffer full
def get(self):
with self.buffer.get_lock():
if self.count.value > 0:
item = self.buffer[self.head.value]
self.head.value = (self.head.value + 1) % self.size
self.count.value -= 1
return item
return None # Buffer empty
def producer(buffer, items):
for item in items:
while not buffer.put(item):
time.sleep(0.01) # Wait if buffer full
print(f"Produced: {item}")
time.sleep(0.1)
def consumer(buffer, consumer_id):
while True:
item = buffer.get()
if item is not None:
print(f"Consumer {consumer_id} got: {item}")
time.sleep(0.15)
else:
time.sleep(0.05)
# Check for termination condition
break
# Usage
if __name__ == '__main__':
circ_buffer = CircularBuffer(5)
prod = Process(target=producer, args=(circ_buffer, range(10)))
cons1 = Process(target=consumer, args=(circ_buffer, 1))
cons2 = Process(target=consumer, args=(circ_buffer, 2))
prod.start()
cons1.start()
cons2.start()
prod.join()
time.sleep(2) # Let consumers finish
cons1.terminate()
cons2.terminate()from billiard import Process, Array, Value
import time
import random
class SharedStats:
def __init__(self):
self.count = Value('i', 0)
self.sum = Value('d', 0.0)
self.min_val = Value('d', float('inf'))
self.max_val = Value('d', float('-inf'))
def update(self, value):
with self.count.get_lock():
self.count.value += 1
self.sum.value += value
if value < self.min_val.value:
self.min_val.value = value
if value > self.max_val.value:
self.max_val.value = value
def get_stats(self):
with self.count.get_lock():
if self.count.value > 0:
return {
'count': self.count.value,
'sum': self.sum.value,
'avg': self.sum.value / self.count.value,
'min': self.min_val.value,
'max': self.max_val.value
}
return {'count': 0}
def data_generator(stats, num_values):
for _ in range(num_values):
value = random.uniform(-100, 100)
stats.update(value)
time.sleep(0.01)
def stats_reporter(stats):
for _ in range(10):
time.sleep(0.5)
current_stats = stats.get_stats()
print(f"Stats: {current_stats}")
# Usage
if __name__ == '__main__':
stats = SharedStats()
# Start data generators
generators = []
for i in range(3):
p = Process(target=data_generator, args=(stats, 20))
generators.append(p)
p.start()
# Start reporter
reporter = Process(target=stats_reporter, args=(stats,))
reporter.start()
for p in generators:
p.join()
reporter.join()
final_stats = stats.get_stats()
print(f"Final stats: {final_stats}")lock=True (default)Install with Tessl CLI
npx tessl i tessl/pypi-billiard