CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-billiard

Python multiprocessing fork with improvements and bugfixes for distributed task processing

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

shared-memory.mddocs/

Shared Memory

Synchronized and unsynchronized shared memory objects for efficient data sharing between processes using ctypes-based values and arrays.

Capabilities

Shared Values

Create shared values that can be accessed by multiple processes with optional synchronization.

def Value(typecode_or_type, *args, lock=True, ctx=None):
    """
    Create a synchronized shared ctypes value.
    
    Parameters:
    - typecode_or_type: ctypes type or single character typecode
    - *args: initialization arguments for the value
    - lock: if True (default), operations are synchronized; if False, unsynchronized
    - ctx: multiprocessing context
    
    Returns:
    SynchronizedBase wrapper around ctypes value
    """

def RawValue(typecode_or_type, *args):
    """
    Create an unsynchronized shared ctypes value.
    
    Parameters:
    - typecode_or_type: ctypes type or single character typecode  
    - *args: initialization arguments for the value
    
    Returns:
    Raw ctypes value (no synchronization)
    """

Common typecodes:

  • 'i' - signed int
  • 'f' - float
  • 'd' - double
  • 'c' - char
  • 'b' - signed char
  • 'B' - unsigned char
  • 'h' - short
  • 'l' - long

Usage example:

from billiard import Process, Value, RawValue
import time
import ctypes

def worker_with_shared_value(shared_counter, worker_id, iterations):
    """Worker that increments shared counter"""
    for i in range(iterations):
        with shared_counter.get_lock():
            old_value = shared_counter.value
            time.sleep(0.001)  # Simulate some work
            shared_counter.value = old_value + 1
            print(f"Worker {worker_id}: counter = {shared_counter.value}")

def raw_value_worker(raw_val, worker_id):
    """Worker using unsynchronized raw value (unsafe!)"""
    for i in range(5):
        raw_val.value += 1
        print(f"Worker {worker_id}: raw value = {raw_val.value}")
        time.sleep(0.1)

if __name__ == '__main__':
    # Synchronized shared value
    counter = Value('i', 0)  # Integer initialized to 0
    print(f"Initial counter value: {counter.value}")
    
    # Start workers that safely increment counter
    processes = []
    for i in range(3):
        p = Process(target=worker_with_shared_value, args=(counter, i, 5))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(f"Final counter value: {counter.value}")
    
    # Demonstrate ctypes usage
    float_value = Value(ctypes.c_double, 3.14159)
    print(f"Float value: {float_value.value}")
    
    # Raw value (no synchronization)
    raw_counter = RawValue('i', 0)
    
    # Start workers with raw value (potential race conditions)
    raw_processes = []
    for i in range(2):
        p = Process(target=raw_value_worker, args=(raw_counter, i))
        raw_processes.append(p)
        p.start()
    
    for p in raw_processes:
        p.join()
    
    print(f"Final raw counter: {raw_counter.value}")

Shared Arrays

Create shared arrays that can be accessed by multiple processes with optional synchronization.

def Array(typecode_or_type, size_or_initializer, lock=True, ctx=None):
    """
    Create a synchronized shared ctypes array.
    
    Parameters:
    - typecode_or_type: ctypes type or single character typecode
    - size_or_initializer: array size (int) or sequence to initialize from
    - lock: if True (default), operations are synchronized; if False, unsynchronized  
    - ctx: multiprocessing context
    
    Returns:
    SynchronizedArray wrapper around ctypes array
    """

def RawArray(typecode_or_type, size_or_initializer):
    """
    Create an unsynchronized shared ctypes array.
    
    Parameters:
    - typecode_or_type: ctypes type or single character typecode
    - size_or_initializer: array size (int) or sequence to initialize from
    
    Returns:
    Raw ctypes array (no synchronization)
    """

Usage example:

from billiard import Process, Array, RawArray
import time

def array_worker(shared_array, worker_id, start_idx, count):
    """Worker that modifies part of shared array"""
    with shared_array.get_lock():
        for i in range(count):
            idx = start_idx + i
            if idx < len(shared_array):
                shared_array[idx] = worker_id * 100 + i
                print(f"Worker {worker_id}: set array[{idx}] = {shared_array[idx]}")
                time.sleep(0.1)

def array_reader(shared_array):
    """Process that reads from shared array"""
    time.sleep(1)  # Let writers work first
    
    with shared_array.get_lock():
        print("Array contents:", list(shared_array[:]))
        print("Array sum:", sum(shared_array))

def matrix_worker(matrix, row, cols):
    """Worker that processes a row of 2D array"""
    for col in range(cols):
        idx = row * cols + col
        matrix[idx] = row * cols + col + 1  # Fill with sequential values
        time.sleep(0.05)

if __name__ == '__main__':
    # Create synchronized shared array
    shared_arr = Array('i', 10)  # Integer array of size 10
    print(f"Initial array: {list(shared_arr[:])}")
    
    # Start workers to modify different parts of array
    processes = []
    for i in range(3):
        start = i * 3
        p = Process(target=array_worker, args=(shared_arr, i, start, 3))
        processes.append(p)
        p.start()
    
    # Start reader process
    reader_proc = Process(target=array_reader, args=(shared_arr,))
    reader_proc.start()
    processes.append(reader_proc)
    
    for p in processes:
        p.join()
    
    # Array from initializer
    init_data = [1, 2, 3, 4, 5]
    initialized_array = Array('i', init_data)
    print(f"Initialized array: {list(initialized_array[:])}")
    
    # 2D array simulation (flattened)
    rows, cols = 3, 4
    matrix = Array('i', rows * cols)
    
    # Process each row in parallel
    matrix_procs = []
    for row in range(rows):
        p = Process(target=matrix_worker, args=(matrix, row, cols))
        matrix_procs.append(p)
        p.start()
    
    for p in matrix_procs:
        p.join()
    
    # Print matrix
    print("Matrix:")
    for row in range(rows):
        row_data = []
        for col in range(cols):
            row_data.append(matrix[row * cols + col])
        print(row_data)

Shared Memory Utilities

Additional functions for working with shared memory objects.

def copy(obj):
    """
    Create a copy of a shared object.
    
    Parameters:
    - obj: shared object to copy
    
    Returns:
    Copy of the shared object
    """

def synchronized(obj, lock=None, ctx=None):
    """
    Add synchronization wrapper to an object.
    
    Parameters:
    - obj: object to wrap
    - lock: lock to use (creates new Lock if None)
    - ctx: multiprocessing context
    
    Returns:
    Synchronized wrapper around object
    """

Usage example:

from billiard import Process, RawArray, Lock
from billiard.sharedctypes import synchronized, copy
import time

def synchronized_access_example():
    """Demonstrate adding synchronization to raw shared object"""
    # Create raw array (no built-in synchronization)
    raw_arr = RawArray('i', [0] * 10)
    
    # Add synchronization wrapper
    sync_arr = synchronized(raw_arr)
    
    def sync_worker(arr, worker_id):
        with arr.get_lock():
            for i in range(len(arr)):
                arr[i] += worker_id
                time.sleep(0.01)
            print(f"Worker {worker_id} completed")
    
    # Use synchronized array
    processes = []
    for i in range(1, 4):
        p = Process(target=sync_worker, args=(sync_arr, i))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(f"Final array: {list(sync_arr[:])}")
    
    # Copy shared object
    arr_copy = copy(sync_arr)
    print(f"Copied array: {list(arr_copy[:])}")

if __name__ == '__main__':
    synchronized_access_example()

Advanced Shared Memory Patterns

Circular Buffer

from billiard import Process, Array, Value
import time

class CircularBuffer:
    def __init__(self, size):
        self.buffer = Array('i', size)
        self.size = size
        self.head = Value('i', 0)
        self.tail = Value('i', 0)
        self.count = Value('i', 0)
    
    def put(self, item):
        with self.buffer.get_lock():
            if self.count.value < self.size:
                self.buffer[self.tail.value] = item
                self.tail.value = (self.tail.value + 1) % self.size
                self.count.value += 1
                return True
            return False  # Buffer full
    
    def get(self):
        with self.buffer.get_lock():
            if self.count.value > 0:
                item = self.buffer[self.head.value]
                self.head.value = (self.head.value + 1) % self.size
                self.count.value -= 1
                return item
            return None  # Buffer empty

def producer(buffer, items):
    for item in items:
        while not buffer.put(item):
            time.sleep(0.01)  # Wait if buffer full
        print(f"Produced: {item}")
        time.sleep(0.1)

def consumer(buffer, consumer_id):
    while True:
        item = buffer.get()
        if item is not None:
            print(f"Consumer {consumer_id} got: {item}")
            time.sleep(0.15)
        else:
            time.sleep(0.05)
            # Check for termination condition
            break

# Usage
if __name__ == '__main__':
    circ_buffer = CircularBuffer(5)
    
    prod = Process(target=producer, args=(circ_buffer, range(10)))
    cons1 = Process(target=consumer, args=(circ_buffer, 1))
    cons2 = Process(target=consumer, args=(circ_buffer, 2))
    
    prod.start()
    cons1.start()
    cons2.start()
    
    prod.join()
    time.sleep(2)  # Let consumers finish
    cons1.terminate()
    cons2.terminate()

Shared Statistics

from billiard import Process, Array, Value
import time
import random

class SharedStats:
    def __init__(self):
        self.count = Value('i', 0)
        self.sum = Value('d', 0.0)
        self.min_val = Value('d', float('inf'))
        self.max_val = Value('d', float('-inf'))
    
    def update(self, value):
        with self.count.get_lock():
            self.count.value += 1
            self.sum.value += value
            if value < self.min_val.value:
                self.min_val.value = value
            if value > self.max_val.value:
                self.max_val.value = value
    
    def get_stats(self):
        with self.count.get_lock():
            if self.count.value > 0:
                return {
                    'count': self.count.value,
                    'sum': self.sum.value,
                    'avg': self.sum.value / self.count.value,
                    'min': self.min_val.value,
                    'max': self.max_val.value
                }
            return {'count': 0}

def data_generator(stats, num_values):
    for _ in range(num_values):
        value = random.uniform(-100, 100)
        stats.update(value)
        time.sleep(0.01)

def stats_reporter(stats):
    for _ in range(10):
        time.sleep(0.5)
        current_stats = stats.get_stats()
        print(f"Stats: {current_stats}")

# Usage
if __name__ == '__main__':
    stats = SharedStats()
    
    # Start data generators
    generators = []
    for i in range(3):
        p = Process(target=data_generator, args=(stats, 20))
        generators.append(p)
        p.start()
    
    # Start reporter
    reporter = Process(target=stats_reporter, args=(stats,))
    reporter.start()
    
    for p in generators:
        p.join()
    
    reporter.join()
    
    final_stats = stats.get_stats()
    print(f"Final stats: {final_stats}")

Memory Layout and Performance

  • Shared values and arrays reside in shared memory accessible by all processes
  • Synchronization overhead occurs only when lock=True (default)
  • Raw values/arrays have no synchronization overhead but require manual coordination
  • ctypes integration provides direct memory access with C-compatible data types
  • Initialization can be done with size (zeros) or from existing sequences

Install with Tessl CLI

npx tessl i tessl/pypi-billiard

docs

communication.md

context-management.md

index.md

managers.md

process-management.md

process-pools.md

queues.md

shared-memory.md

synchronization.md

tile.json