Python multiprocessing fork with improvements and bugfixes for distributed task processing
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Inter-process communication through pipes and connections with support for both object and byte-level messaging, listeners, clients, and connection management.
Create pairs of connected objects for bidirectional communication between processes.
def Pipe(duplex=True, rnonblock=False, wnonblock=False) -> tuple[Connection, Connection]:
"""
Create a pair of connected Connection objects.
Parameters:
- duplex: if True (default), pipe is bidirectional; if False, unidirectional
- rnonblock: if True, read operations are non-blocking
- wnonblock: if True, write operations are non-blocking
Returns:
Tuple of (Connection, Connection) objects
"""Usage example:
from billiard import Process, Pipe
import time
def sender(conn, messages):
"""Send messages through connection"""
for msg in messages:
print(f"Sending: {msg}")
conn.send(msg)
time.sleep(0.5)
conn.close()
def receiver(conn):
"""Receive messages from connection"""
while True:
try:
msg = conn.recv()
print(f"Received: {msg}")
except EOFError:
break
conn.close()
if __name__ == '__main__':
# Create pipe
parent_conn, child_conn = Pipe()
messages = ["Hello", "World", "From", "Billiard"]
# Start processes
sender_proc = Process(target=sender, args=(parent_conn, messages))
receiver_proc = Process(target=receiver, args=(child_conn))
sender_proc.start()
receiver_proc.start()
sender_proc.join()
receiver_proc.join()Connection objects provide methods for sending and receiving data between processes.
class Connection:
"""
Connection object for inter-process communication.
"""
def send(self, obj):
"""
Send an object through the connection.
Parameters:
- obj: object to send (must be picklable)
"""
def recv(self):
"""
Receive an object from the connection.
Returns:
Object received from connection
Raises:
- EOFError: if connection is closed
"""
def send_bytes(self, buf, offset=0, size=None):
"""
Send bytes through the connection.
Parameters:
- buf: bytes-like object to send
- offset: offset in buffer to start from
- size: number of bytes to send (None for all remaining)
"""
def recv_bytes(self, maxlength=None) -> bytes:
"""
Receive bytes from the connection.
Parameters:
- maxlength: maximum number of bytes to receive
Returns:
Bytes received from connection
Raises:
- EOFError: if connection is closed
- OSError: if message too long for maxlength
"""
def recv_bytes_into(self, buf, offset=0) -> int:
"""
Receive bytes into an existing buffer.
Parameters:
- buf: writable buffer to receive into
- offset: offset in buffer to start writing
Returns:
Number of bytes received
"""
def poll(self, timeout=None) -> bool:
"""
Check if data is available for reading.
Parameters:
- timeout: timeout in seconds (None for non-blocking check)
Returns:
True if data is available, False otherwise
"""
def close(self):
"""
Close the connection.
"""
@property
def closed(self) -> bool:
"""True if connection is closed."""
@property
def readable(self) -> bool:
"""True if connection is readable."""
@property
def writable(self) -> bool:
"""True if connection is writable."""Usage example:
from billiard import Process, Pipe
import time
def byte_sender(conn):
"""Send raw bytes through connection"""
messages = [b"Hello", b"World", b"Bytes"]
for msg in messages:
print(f"Sending bytes: {msg}")
conn.send_bytes(msg)
time.sleep(0.2)
conn.close()
def byte_receiver(conn):
"""Receive raw bytes from connection"""
while True:
if conn.poll(timeout=1):
try:
data = conn.recv_bytes(maxlength=1024)
print(f"Received bytes: {data}")
except EOFError:
break
else:
print("No data available")
break
conn.close()
def polling_example():
"""Demonstrate connection polling"""
parent_conn, child_conn = Pipe()
# Start byte communication processes
sender_proc = Process(target=byte_sender, args=(parent_conn,))
receiver_proc = Process(target=byte_receiver, args=(child_conn,))
sender_proc.start()
receiver_proc.start()
sender_proc.join()
receiver_proc.join()
if __name__ == '__main__':
polling_example()Server-client communication using listeners and clients for network-style IPC.
class Listener:
"""
A listener for incoming connections.
"""
def __init__(self, address=None, family=None, backlog=1, authkey=None):
"""
Create a listener.
Parameters:
- address: address to bind to
- family: address family
- backlog: maximum number of pending connections
- authkey: authentication key
"""
def accept(self) -> Connection:
"""
Accept a connection and return Connection object.
Returns:
Connection object for accepted connection
"""
def close(self):
"""
Close the listener.
"""
@property
def address(self):
"""Address of the listener."""
@property
def last_accepted(self):
"""Address of last accepted connection."""
def Client(address, family=None, authkey=None) -> Connection:
"""
Create a client connection.
Parameters:
- address: address to connect to
- family: address family
- authkey: authentication key
Returns:
Connection object
"""Usage example:
from billiard import Process
from billiard.connection import Listener, Client
import time
def server_process(address):
"""Server that accepts connections"""
with Listener(address, authkey=b'secret') as listener:
print(f"Server listening on {listener.address}")
# Accept multiple connections
for i in range(3):
print(f"Waiting for connection {i+1}...")
conn = listener.accept()
print(f"Connection {i+1} from {listener.last_accepted}")
# Handle client
try:
while True:
msg = conn.recv()
print(f"Server received: {msg}")
conn.send(f"Echo: {msg}")
except EOFError:
print(f"Client {i+1} disconnected")
finally:
conn.close()
def client_process(address, client_id):
"""Client that connects to server"""
try:
conn = Client(address, authkey=b'secret')
print(f"Client {client_id} connected")
# Send messages
for i in range(3):
msg = f"Message {i} from client {client_id}"
conn.send(msg)
response = conn.recv()
print(f"Client {client_id} got response: {response}")
time.sleep(0.5)
conn.close()
print(f"Client {client_id} finished")
except Exception as e:
print(f"Client {client_id} error: {e}")
if __name__ == '__main__':
# Use named pipe on Unix or localhost on Windows
import sys
if sys.platform == 'win32':
address = ('localhost', 6000)
else:
address = '/tmp/billiard_socket'
# Start server
server_proc = Process(target=server_process, args=(address,))
server_proc.start()
time.sleep(0.5) # Let server start
# Start clients
clients = []
for i in range(3):
client_proc = Process(target=client_process, args=(address, i))
clients.append(client_proc)
client_proc.start()
# Wait for completion
for client_proc in clients:
client_proc.join()
server_proc.join()Utility functions for working with multiple connections.
def wait(object_list, timeout=None) -> list:
"""
Wait until one or more connections/objects are ready.
Parameters:
- object_list: list of Connection objects or other waitable objects
- timeout: timeout in seconds (None for no timeout)
Returns:
List of objects that are ready for reading
"""Usage example:
from billiard import Process, Pipe
from billiard.connection import wait
import time
import random
def delayed_sender(conn, delay, message):
"""Send message after delay"""
time.sleep(delay)
conn.send(message)
conn.close()
def multi_connection_wait():
"""Demonstrate waiting on multiple connections"""
connections = []
processes = []
# Create multiple pipes with different delays
for i in range(4):
parent_conn, child_conn = Pipe()
connections.append(parent_conn)
delay = random.uniform(0.5, 2.0)
message = f"Message from connection {i}"
proc = Process(target=delayed_sender, args=(child_conn, delay, message))
processes.append(proc)
proc.start()
print("Waiting for messages from multiple connections...")
# Wait for connections to become ready
ready_count = 0
while ready_count < len(connections):
ready = wait(connections, timeout=3.0)
if ready:
print(f"{len(ready)} connections ready")
for conn in ready:
try:
msg = conn.recv()
print(f"Received: {msg}")
ready_count += 1
connections.remove(conn)
except EOFError:
pass
else:
print("Timeout waiting for connections")
break
# Clean up
for proc in processes:
proc.join()
for conn in connections:
conn.close()
if __name__ == '__main__':
multi_connection_wait()from billiard import Process, Pipe
import time
def producer(conn):
for i in range(5):
item = f"item_{i}"
conn.send(item)
print(f"Produced: {item}")
time.sleep(0.5)
conn.send(None) # Signal end
conn.close()
def consumer(conn):
while True:
item = conn.recv()
if item is None:
break
print(f"Consumed: {item}")
time.sleep(0.3)
conn.close()
# Usage
parent_conn, child_conn = Pipe()
prod = Process(target=producer, args=(parent_conn,))
cons = Process(target=consumer, args=(child_conn,))
prod.start()
cons.start()
prod.join()
cons.join()from billiard import Process, Pipe
def service(conn):
while True:
try:
request = conn.recv()
response = f"Processed: {request}"
conn.send(response)
except EOFError:
break
conn.close()
def client_requests(conn):
for i in range(3):
request = f"request_{i}"
conn.send(request)
response = conn.recv()
print(f"Got response: {response}")
conn.close()
# Usage
service_conn, client_conn = Pipe()
srv = Process(target=service, args=(service_conn,))
cli = Process(target=client_requests, args=(client_conn,))
srv.start()
cli.start()
cli.join()
srv.terminate()
srv.join()Install with Tessl CLI
npx tessl i tessl/pypi-billiard