CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-multiprocess

A fork of Python's multiprocessing module that extends multiprocessing to provide enhanced serialization using dill

Pending
Overview
Eval results
Files

communication.mddocs/

Inter-Process Communication

Communication mechanisms for data exchange between processes. Multiprocess provides queues for message passing and pipes for bidirectional communication, with enhanced serialization support via dill.

Capabilities

Queue Classes

Thread-safe queues for passing objects between processes with FIFO semantics.

class Queue:
    """
    A multi-producer, multi-consumer queue for inter-process communication.
    
    Args:
        maxsize: maximum size of the queue (0 = unlimited)
    """
    def __init__(self, maxsize=0): ...
    
    def put(self, item, block=True, timeout=None):
        """
        Put an item into the queue.
        
        Args:
            item: object to put in queue
            block: if True, block until space is available
            timeout: maximum time to wait (seconds)
            
        Raises:
            queue.Full: if queue is full and block=False or timeout exceeded
        """
    
    def get(self, block=True, timeout=None):
        """
        Remove and return an item from the queue.
        
        Args:
            block: if True, block until item is available
            timeout: maximum time to wait (seconds)
            
        Returns:
            object: item from queue
            
        Raises:
            queue.Empty: if queue is empty and block=False or timeout exceeded
        """
    
    def put_nowait(self, item):
        """
        Put an item without blocking.
        
        Args:
            item: object to put in queue
            
        Raises:
            queue.Full: if queue is full
        """
    
    def get_nowait(self):
        """
        Get an item without blocking.
        
        Returns:
            object: item from queue
            
        Raises:
            queue.Empty: if queue is empty
        """
    
    def empty(self):
        """
        Return True if the queue is empty.
        
        Returns:
            bool: True if queue is empty (approximate)
        """
    
    def full(self):
        """
        Return True if the queue is full.
        
        Returns:
            bool: True if queue is full (approximate)
        """
    
    def qsize(self):
        """
        Return the approximate size of the queue.
        
        Returns:
            int: approximate number of items in queue
        """
    
    def close(self):
        """Indicate that no more data will be put on this queue."""
    
    def join_thread(self):
        """Join the background thread used by the queue."""

JoinableQueue Class

Queue with task tracking capabilities for producer-consumer patterns.

class JoinableQueue(Queue):
    """
    A Queue subclass that adds task tracking capabilities.
    
    Args:
        maxsize: maximum size of the queue (0 = unlimited)
    """
    def __init__(self, maxsize=0): ...
    
    def task_done(self):
        """
        Indicate that a formerly enqueued task is complete.
        
        Must be called once for each item retrieved from the queue.
        """
    
    def join(self):
        """
        Block until all items in the queue have been gotten and processed.
        
        The count of unfinished tasks goes up when items are added and
        goes down when task_done() is called.
        """

SimpleQueue Class

Simplified queue implementation with minimal overhead.

class SimpleQueue:
    """
    A simplified queue implementation with lower overhead.
    """
    def __init__(self): ...
    
    def put(self, item):
        """
        Put an item into the queue.
        
        Args:
            item: object to put in queue
        """
    
    def get(self):
        """
        Remove and return an item from the queue.
        
        Returns:
            object: item from queue
        """
    
    def empty(self):
        """
        Return True if the queue is empty.
        
        Returns:
            bool: True if queue is empty
        """
    
    def close(self):
        """Indicate that no more data will be put on this queue."""

Pipe Communication

Bidirectional communication channels between processes.

def Pipe(duplex=True):
    """
    Create a pipe between two processes.
    
    Args:
        duplex: if True, pipe is bidirectional; if False, unidirectional
        
    Returns:
        tuple[Connection, Connection]: pair of Connection objects
    """

Connection Objects

class Connection:
    """
    Connection object for pipe communication.
    """
    def send(self, obj):
        """
        Send an object through the connection.
        
        Args:
            obj: object to send
        """
    
    def recv(self):
        """
        Receive an object from the connection.
        
        Returns:
            object: received object
        """
    
    def send_bytes(self, buffer, offset=0, size=None):
        """
        Send byte data through the connection.
        
        Args:
            buffer: bytes-like object to send
            offset: starting position in buffer
            size: number of bytes to send
        """
    
    def recv_bytes(self, maxlength=None):
        """
        Receive byte data from the connection.
        
        Args:
            maxlength: maximum number of bytes to receive
            
        Returns:
            bytes: received byte data
        """
    
    def recv_bytes_into(self, buffer, offset=0):
        """
        Receive byte data into an existing buffer.
        
        Args:
            buffer: writable buffer to receive into
            offset: starting position in buffer
            
        Returns:
            int: number of bytes received
        """
    
    def poll(self, timeout=0.0):
        """
        Check if data is available for reading.
        
        Args:
            timeout: time to wait for data (seconds)
            
        Returns:
            bool: True if data is available
        """
    
    def close(self):
        """Close the connection."""
    
    # Properties
    readable: bool  # True if connection can receive
    writable: bool  # True if connection can send
    closed: bool    # True if connection is closed

Advanced Connection Functions

Additional functions for working with connections and sockets.

def wait(object_list, timeout=None):
    """
    Wait until one or more objects in object_list are ready.
    
    Args:
        object_list: list of Connection objects or other waitable objects
        timeout: maximum time to wait (seconds)
        
    Returns:
        list: subset of object_list that are ready
    """

class Listener:
    """
    A wrapper for a bound socket which is 'listening' for connections.
    
    Args:
        address: address to bind to
        family: socket family (default: None for auto-detection)
        backlog: maximum number of pending connections
        authkey: authentication key for connections
    """
    def __init__(self, address=None, family=None, backlog=1, authkey=None): ...
    
    def accept(self):
        """
        Accept a connection on the bound socket.
        
        Returns:
            Connection: new connection object
        """
    
    def close(self):
        """Close the listener."""
    
    # Properties
    address: tuple  # Address the listener is bound to
    last_accepted: Connection  # Last accepted connection

def Client(address, family=None, authkey=None):
    """
    Connect to a Listener and return a Connection object.
    
    Args:
        address: address to connect to
        family: socket family (default: None for auto-detection)  
        authkey: authentication key
        
    Returns:
        Connection: connection object
    """

Usage Examples

Basic Queue Communication

from multiprocess import Process, Queue

def producer(q, items):
    for item in items:
        print(f"Producing {item}")
        q.put(item)
    q.put(None)  # Signal completion

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Consuming {item}")

# Create queue
queue = Queue()

# Create processes
items_to_produce = ['item1', 'item2', 'item3', 'item4']
prod = Process(target=producer, args=(queue, items_to_produce))
cons = Process(target=consumer, args=(queue,))

# Start processes
prod.start()
cons.start()

# Wait for completion
prod.join()
cons.join()

JoinableQueue with Task Tracking

from multiprocess import Process, JoinableQueue
import time

def worker(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Processing {item}")
        time.sleep(1)  # Simulate work
        q.task_done()

def add_tasks(q, tasks):
    for task in tasks:
        q.put(task)

# Create joinable queue
q = JoinableQueue()

# Start worker processes
workers = []
for i in range(2):
    p = Process(target=worker, args=(q,))
    p.start()
    workers.append(p)

# Add tasks
tasks = [f"task-{i}" for i in range(5)]
for task in tasks:
    q.put(task)

# Wait for all tasks to complete
q.join()
print("All tasks completed")

# Stop workers
for _ in workers:
    q.put(None)
for p in workers:
    p.join()

Pipe Communication

from multiprocess import Process, Pipe

def sender(conn, messages):
    for msg in messages:
        print(f"Sending: {msg}")
        conn.send(msg)
    conn.send("DONE")
    conn.close()

def receiver(conn):
    while True:
        msg = conn.recv()
        print(f"Received: {msg}")
        if msg == "DONE":
            break
    conn.close()

# Create pipe
parent_conn, child_conn = Pipe()

# Create processes
messages = ["Hello", "World", "From", "Pipe"]
p1 = Process(target=sender, args=(parent_conn, messages))
p2 = Process(target=receiver, args=(child_conn))

p1.start()
p2.start()

p1.join()
p2.join()

Bidirectional Pipe Communication

from multiprocess import Process, Pipe
import time

def ping_pong(conn, name, count):
    for i in range(count):
        if name == "ping":
            msg = f"ping-{i}"
            conn.send(msg)
            print(f"Sent: {msg}")
            response = conn.recv()
            print(f"Received: {response}")
        else:
            request = conn.recv()
            print(f"Received: {request}")
            msg = f"pong-{i}"
            conn.send(msg)
            print(f"Sent: {msg}")
        time.sleep(0.5)

# Create duplex pipe
conn1, conn2 = Pipe(duplex=True)

# Create processes
p1 = Process(target=ping_pong, args=(conn1, "ping", 3))
p2 = Process(target=ping_pong, args=(conn2, "pong", 3))

p1.start()
p2.start()

p1.join()
p2.join()

Multiple Producers and Consumers

from multiprocess import Process, Queue
import random
import time

def producer(q, producer_id, num_items):
    for i in range(num_items):
        item = f"Producer-{producer_id}-Item-{i}"
        q.put(item)
        print(f"Produced: {item}")
        time.sleep(random.uniform(0.1, 0.5))
    
def consumer(q, consumer_id):
    while True:
        try:
            item = q.get(timeout=2)
            print(f"Consumer-{consumer_id} consumed: {item}")
            time.sleep(random.uniform(0.2, 0.8))
        except:
            print(f"Consumer-{consumer_id} timed out, exiting")
            break

# Create queue
queue = Queue()

# Create multiple producers
producers = []
for i in range(2):
    p = Process(target=producer, args=(queue, i, 5))
    p.start()
    producers.append(p)

# Create multiple consumers
consumers = []
for i in range(3):
    p = Process(target=consumer, args=(queue, i))
    p.start()
    consumers.append(p)

# Wait for producers to finish
for p in producers:
    p.join()

# Wait for consumers to finish (they will timeout)
for p in consumers:
    p.join()

Enhanced Serialization Example

from multiprocess import Process, Queue
import pickle

# Complex object that requires dill serialization
class ComplexObject:
    def __init__(self, func):
        self.func = func
        self.data = [1, 2, 3, 4, 5]
    
    def process(self):
        return [self.func(x) for x in self.data]

def worker(q):
    while True:
        obj = q.get()
        if obj is None:
            break
        result = obj.process()
        print(f"Worker result: {result}")

# Create object with lambda function (requires dill)
complex_obj = ComplexObject(lambda x: x ** 2)

# Create queue and process
queue = Queue()
p = Process(target=worker, args=(queue,))
p.start()

# Send complex object (automatically serialized with dill)
queue.put(complex_obj)
queue.put(None)  # Signal completion

p.join()

Connection with Authentication

from multiprocess import Process
from multiprocess.connection import Listener, Client

def server(address, authkey):
    with Listener(address, authkey=authkey) as listener:
        print(f"Server listening on {address}")
        with listener.accept() as conn:
            print("Connection accepted")
            while True:
                try:
                    msg = conn.recv()
                    print(f"Server received: {msg}")
                    if msg == "quit":
                        break
                    conn.send(f"Echo: {msg}")
                except EOFError:
                    break

def client(address, authkey):
    with Client(address, authkey=authkey) as conn:
        messages = ["hello", "world", "quit"]
        for msg in messages:
            conn.send(msg)
            if msg != "quit":
                response = conn.recv()
                print(f"Client received: {response}")

# Authentication key
authkey = b'secret_key'
address = ('localhost', 6000)

# Start server process
server_process = Process(target=server, args=(address, authkey))
server_process.start()

# Give server time to start
import time
time.sleep(0.5)

# Start client process
client_process = Process(target=client, args=(address, authkey))
client_process.start()

client_process.join()
server_process.join()

Install with Tessl CLI

npx tessl i tessl/pypi-multiprocess

docs

communication.md

context-config.md

index.md

pools.md

process-management.md

shared-objects.md

synchronization.md

tile.json