CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-billiard

Python multiprocessing fork with improvements and bugfixes for distributed task processing

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

communication.mddocs/

Communication

Inter-process communication through pipes and connections with support for both object and byte-level messaging, listeners, clients, and connection management.

Capabilities

Pipes

Create pairs of connected objects for bidirectional communication between processes.

def Pipe(duplex=True, rnonblock=False, wnonblock=False) -> tuple[Connection, Connection]:
    """
    Create a pair of connected Connection objects.
    
    Parameters:
    - duplex: if True (default), pipe is bidirectional; if False, unidirectional
    - rnonblock: if True, read operations are non-blocking
    - wnonblock: if True, write operations are non-blocking
    
    Returns:
    Tuple of (Connection, Connection) objects
    """

Usage example:

from billiard import Process, Pipe
import time

def sender(conn, messages):
    """Send messages through connection"""
    for msg in messages:
        print(f"Sending: {msg}")
        conn.send(msg)
        time.sleep(0.5)
    conn.close()

def receiver(conn):
    """Receive messages from connection"""
    while True:
        try:
            msg = conn.recv()
            print(f"Received: {msg}")
        except EOFError:
            break
    conn.close()

if __name__ == '__main__':
    # Create pipe
    parent_conn, child_conn = Pipe()
    
    messages = ["Hello", "World", "From", "Billiard"]
    
    # Start processes
    sender_proc = Process(target=sender, args=(parent_conn, messages))
    receiver_proc = Process(target=receiver, args=(child_conn))
    
    sender_proc.start()
    receiver_proc.start()
    
    sender_proc.join()
    receiver_proc.join()

Connections

Connection objects provide methods for sending and receiving data between processes.

class Connection:
    """
    Connection object for inter-process communication.
    """
    def send(self, obj):
        """
        Send an object through the connection.
        
        Parameters:
        - obj: object to send (must be picklable)
        """
    
    def recv(self):
        """
        Receive an object from the connection.
        
        Returns:
        Object received from connection
        
        Raises:
        - EOFError: if connection is closed
        """
    
    def send_bytes(self, buf, offset=0, size=None):
        """
        Send bytes through the connection.
        
        Parameters:
        - buf: bytes-like object to send
        - offset: offset in buffer to start from
        - size: number of bytes to send (None for all remaining)
        """
    
    def recv_bytes(self, maxlength=None) -> bytes:
        """
        Receive bytes from the connection.
        
        Parameters:
        - maxlength: maximum number of bytes to receive
        
        Returns:
        Bytes received from connection
        
        Raises:
        - EOFError: if connection is closed
        - OSError: if message too long for maxlength
        """
    
    def recv_bytes_into(self, buf, offset=0) -> int:
        """
        Receive bytes into an existing buffer.
        
        Parameters:
        - buf: writable buffer to receive into
        - offset: offset in buffer to start writing
        
        Returns:
        Number of bytes received
        """
    
    def poll(self, timeout=None) -> bool:
        """
        Check if data is available for reading.
        
        Parameters:
        - timeout: timeout in seconds (None for non-blocking check)
        
        Returns:
        True if data is available, False otherwise
        """
    
    def close(self):
        """
        Close the connection.
        """
    
    @property
    def closed(self) -> bool:
        """True if connection is closed."""
    
    @property
    def readable(self) -> bool:
        """True if connection is readable."""
    
    @property
    def writable(self) -> bool:
        """True if connection is writable."""

Usage example:

from billiard import Process, Pipe
import time

def byte_sender(conn):
    """Send raw bytes through connection"""
    messages = [b"Hello", b"World", b"Bytes"]
    
    for msg in messages:
        print(f"Sending bytes: {msg}")
        conn.send_bytes(msg)
        time.sleep(0.2)
    
    conn.close()

def byte_receiver(conn):
    """Receive raw bytes from connection"""
    while True:
        if conn.poll(timeout=1):
            try:
                data = conn.recv_bytes(maxlength=1024)
                print(f"Received bytes: {data}")
            except EOFError:
                break
        else:
            print("No data available")
            break
    
    conn.close()

def polling_example():
    """Demonstrate connection polling"""
    parent_conn, child_conn = Pipe()
    
    # Start byte communication processes
    sender_proc = Process(target=byte_sender, args=(parent_conn,))
    receiver_proc = Process(target=byte_receiver, args=(child_conn,))
    
    sender_proc.start()
    receiver_proc.start()
    
    sender_proc.join()
    receiver_proc.join()

if __name__ == '__main__':
    polling_example()

Listeners and Clients

Server-client communication using listeners and clients for network-style IPC.

class Listener:
    """
    A listener for incoming connections.
    """
    def __init__(self, address=None, family=None, backlog=1, authkey=None):
        """
        Create a listener.
        
        Parameters:
        - address: address to bind to
        - family: address family
        - backlog: maximum number of pending connections
        - authkey: authentication key
        """
    
    def accept(self) -> Connection:
        """
        Accept a connection and return Connection object.
        
        Returns:
        Connection object for accepted connection
        """
    
    def close(self):
        """
        Close the listener.
        """
    
    @property
    def address(self):
        """Address of the listener."""
    
    @property
    def last_accepted(self):
        """Address of last accepted connection."""

def Client(address, family=None, authkey=None) -> Connection:
    """
    Create a client connection.
    
    Parameters:
    - address: address to connect to
    - family: address family
    - authkey: authentication key
    
    Returns:
    Connection object
    """

Usage example:

from billiard import Process
from billiard.connection import Listener, Client
import time

def server_process(address):
    """Server that accepts connections"""
    with Listener(address, authkey=b'secret') as listener:
        print(f"Server listening on {listener.address}")
        
        # Accept multiple connections
        for i in range(3):
            print(f"Waiting for connection {i+1}...")
            conn = listener.accept()
            print(f"Connection {i+1} from {listener.last_accepted}")
            
            # Handle client
            try:
                while True:
                    msg = conn.recv()
                    print(f"Server received: {msg}")
                    conn.send(f"Echo: {msg}")
            except EOFError:
                print(f"Client {i+1} disconnected")
            finally:
                conn.close()

def client_process(address, client_id):
    """Client that connects to server"""
    try:
        conn = Client(address, authkey=b'secret')
        print(f"Client {client_id} connected")
        
        # Send messages
        for i in range(3):
            msg = f"Message {i} from client {client_id}"
            conn.send(msg)
            response = conn.recv()
            print(f"Client {client_id} got response: {response}")
            time.sleep(0.5)
        
        conn.close()
        print(f"Client {client_id} finished")
        
    except Exception as e:
        print(f"Client {client_id} error: {e}")

if __name__ == '__main__':
    # Use named pipe on Unix or localhost on Windows
    import sys
    if sys.platform == 'win32':
        address = ('localhost', 6000)
    else:
        address = '/tmp/billiard_socket'
    
    # Start server
    server_proc = Process(target=server_process, args=(address,))
    server_proc.start()
    
    time.sleep(0.5)  # Let server start
    
    # Start clients
    clients = []
    for i in range(3):
        client_proc = Process(target=client_process, args=(address, i))
        clients.append(client_proc)
        client_proc.start()
    
    # Wait for completion
    for client_proc in clients:
        client_proc.join()
    
    server_proc.join()

Connection Utilities

Utility functions for working with multiple connections.

def wait(object_list, timeout=None) -> list:
    """
    Wait until one or more connections/objects are ready.
    
    Parameters:
    - object_list: list of Connection objects or other waitable objects
    - timeout: timeout in seconds (None for no timeout)
    
    Returns:
    List of objects that are ready for reading
    """

Usage example:

from billiard import Process, Pipe
from billiard.connection import wait
import time
import random

def delayed_sender(conn, delay, message):
    """Send message after delay"""
    time.sleep(delay)
    conn.send(message)
    conn.close()

def multi_connection_wait():
    """Demonstrate waiting on multiple connections"""
    connections = []
    processes = []
    
    # Create multiple pipes with different delays
    for i in range(4):
        parent_conn, child_conn = Pipe()
        connections.append(parent_conn)
        
        delay = random.uniform(0.5, 2.0)
        message = f"Message from connection {i}"
        
        proc = Process(target=delayed_sender, args=(child_conn, delay, message))
        processes.append(proc)
        proc.start()
    
    print("Waiting for messages from multiple connections...")
    
    # Wait for connections to become ready
    ready_count = 0
    while ready_count < len(connections):
        ready = wait(connections, timeout=3.0)
        
        if ready:
            print(f"{len(ready)} connections ready")
            for conn in ready:
                try:
                    msg = conn.recv()
                    print(f"Received: {msg}")
                    ready_count += 1
                    connections.remove(conn)
                except EOFError:
                    pass
        else:
            print("Timeout waiting for connections")
            break
    
    # Clean up
    for proc in processes:
        proc.join()
    
    for conn in connections:
        conn.close()

if __name__ == '__main__':
    multi_connection_wait()

Communication Patterns

Producer-Consumer with Pipes

from billiard import Process, Pipe
import time

def producer(conn):
    for i in range(5):
        item = f"item_{i}"
        conn.send(item)
        print(f"Produced: {item}")
        time.sleep(0.5)
    conn.send(None)  # Signal end
    conn.close()

def consumer(conn):
    while True:
        item = conn.recv()
        if item is None:
            break
        print(f"Consumed: {item}")
        time.sleep(0.3)
    conn.close()

# Usage
parent_conn, child_conn = Pipe()
prod = Process(target=producer, args=(parent_conn,))
cons = Process(target=consumer, args=(child_conn,))
prod.start()
cons.start()
prod.join()
cons.join()

Request-Response Pattern

from billiard import Process, Pipe

def service(conn):
    while True:
        try:
            request = conn.recv()
            response = f"Processed: {request}"
            conn.send(response)
        except EOFError:
            break
    conn.close()

def client_requests(conn):
    for i in range(3):
        request = f"request_{i}"
        conn.send(request)
        response = conn.recv()
        print(f"Got response: {response}")
    conn.close()

# Usage
service_conn, client_conn = Pipe()
srv = Process(target=service, args=(service_conn,))
cli = Process(target=client_requests, args=(client_conn,))
srv.start()
cli.start()
cli.join()
srv.terminate()
srv.join()

Install with Tessl CLI

npx tessl i tessl/pypi-billiard

docs

communication.md

context-management.md

index.md

managers.md

process-management.md

process-pools.md

queues.md

shared-memory.md

synchronization.md

tile.json