A fork of Python's multiprocessing module that extends multiprocessing to provide enhanced serialization using dill
—
Communication mechanisms for data exchange between processes. Multiprocess provides queues for message passing and pipes for bidirectional communication, with enhanced serialization support via dill.
Thread-safe queues for passing objects between processes with FIFO semantics.
class Queue:
"""
A multi-producer, multi-consumer queue for inter-process communication.
Args:
maxsize: maximum size of the queue (0 = unlimited)
"""
def __init__(self, maxsize=0): ...
def put(self, item, block=True, timeout=None):
"""
Put an item into the queue.
Args:
item: object to put in queue
block: if True, block until space is available
timeout: maximum time to wait (seconds)
Raises:
queue.Full: if queue is full and block=False or timeout exceeded
"""
def get(self, block=True, timeout=None):
"""
Remove and return an item from the queue.
Args:
block: if True, block until item is available
timeout: maximum time to wait (seconds)
Returns:
object: item from queue
Raises:
queue.Empty: if queue is empty and block=False or timeout exceeded
"""
def put_nowait(self, item):
"""
Put an item without blocking.
Args:
item: object to put in queue
Raises:
queue.Full: if queue is full
"""
def get_nowait(self):
"""
Get an item without blocking.
Returns:
object: item from queue
Raises:
queue.Empty: if queue is empty
"""
def empty(self):
"""
Return True if the queue is empty.
Returns:
bool: True if queue is empty (approximate)
"""
def full(self):
"""
Return True if the queue is full.
Returns:
bool: True if queue is full (approximate)
"""
def qsize(self):
"""
Return the approximate size of the queue.
Returns:
int: approximate number of items in queue
"""
def close(self):
"""Indicate that no more data will be put on this queue."""
def join_thread(self):
"""Join the background thread used by the queue."""Queue with task tracking capabilities for producer-consumer patterns.
class JoinableQueue(Queue):
"""
A Queue subclass that adds task tracking capabilities.
Args:
maxsize: maximum size of the queue (0 = unlimited)
"""
def __init__(self, maxsize=0): ...
def task_done(self):
"""
Indicate that a formerly enqueued task is complete.
Must be called once for each item retrieved from the queue.
"""
def join(self):
"""
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up when items are added and
goes down when task_done() is called.
"""Simplified queue implementation with minimal overhead.
class SimpleQueue:
"""
A simplified queue implementation with lower overhead.
"""
def __init__(self): ...
def put(self, item):
"""
Put an item into the queue.
Args:
item: object to put in queue
"""
def get(self):
"""
Remove and return an item from the queue.
Returns:
object: item from queue
"""
def empty(self):
"""
Return True if the queue is empty.
Returns:
bool: True if queue is empty
"""
def close(self):
"""Indicate that no more data will be put on this queue."""Bidirectional communication channels between processes.
def Pipe(duplex=True):
"""
Create a pipe between two processes.
Args:
duplex: if True, pipe is bidirectional; if False, unidirectional
Returns:
tuple[Connection, Connection]: pair of Connection objects
"""class Connection:
"""
Connection object for pipe communication.
"""
def send(self, obj):
"""
Send an object through the connection.
Args:
obj: object to send
"""
def recv(self):
"""
Receive an object from the connection.
Returns:
object: received object
"""
def send_bytes(self, buffer, offset=0, size=None):
"""
Send byte data through the connection.
Args:
buffer: bytes-like object to send
offset: starting position in buffer
size: number of bytes to send
"""
def recv_bytes(self, maxlength=None):
"""
Receive byte data from the connection.
Args:
maxlength: maximum number of bytes to receive
Returns:
bytes: received byte data
"""
def recv_bytes_into(self, buffer, offset=0):
"""
Receive byte data into an existing buffer.
Args:
buffer: writable buffer to receive into
offset: starting position in buffer
Returns:
int: number of bytes received
"""
def poll(self, timeout=0.0):
"""
Check if data is available for reading.
Args:
timeout: time to wait for data (seconds)
Returns:
bool: True if data is available
"""
def close(self):
"""Close the connection."""
# Properties
readable: bool # True if connection can receive
writable: bool # True if connection can send
closed: bool # True if connection is closedAdditional functions for working with connections and sockets.
def wait(object_list, timeout=None):
"""
Wait until one or more objects in object_list are ready.
Args:
object_list: list of Connection objects or other waitable objects
timeout: maximum time to wait (seconds)
Returns:
list: subset of object_list that are ready
"""
class Listener:
"""
A wrapper for a bound socket which is 'listening' for connections.
Args:
address: address to bind to
family: socket family (default: None for auto-detection)
backlog: maximum number of pending connections
authkey: authentication key for connections
"""
def __init__(self, address=None, family=None, backlog=1, authkey=None): ...
def accept(self):
"""
Accept a connection on the bound socket.
Returns:
Connection: new connection object
"""
def close(self):
"""Close the listener."""
# Properties
address: tuple # Address the listener is bound to
last_accepted: Connection # Last accepted connection
def Client(address, family=None, authkey=None):
"""
Connect to a Listener and return a Connection object.
Args:
address: address to connect to
family: socket family (default: None for auto-detection)
authkey: authentication key
Returns:
Connection: connection object
"""from multiprocess import Process, Queue
def producer(q, items):
for item in items:
print(f"Producing {item}")
q.put(item)
q.put(None) # Signal completion
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"Consuming {item}")
# Create queue
queue = Queue()
# Create processes
items_to_produce = ['item1', 'item2', 'item3', 'item4']
prod = Process(target=producer, args=(queue, items_to_produce))
cons = Process(target=consumer, args=(queue,))
# Start processes
prod.start()
cons.start()
# Wait for completion
prod.join()
cons.join()from multiprocess import Process, JoinableQueue
import time
def worker(q):
while True:
item = q.get()
if item is None:
break
print(f"Processing {item}")
time.sleep(1) # Simulate work
q.task_done()
def add_tasks(q, tasks):
for task in tasks:
q.put(task)
# Create joinable queue
q = JoinableQueue()
# Start worker processes
workers = []
for i in range(2):
p = Process(target=worker, args=(q,))
p.start()
workers.append(p)
# Add tasks
tasks = [f"task-{i}" for i in range(5)]
for task in tasks:
q.put(task)
# Wait for all tasks to complete
q.join()
print("All tasks completed")
# Stop workers
for _ in workers:
q.put(None)
for p in workers:
p.join()from multiprocess import Process, Pipe
def sender(conn, messages):
for msg in messages:
print(f"Sending: {msg}")
conn.send(msg)
conn.send("DONE")
conn.close()
def receiver(conn):
while True:
msg = conn.recv()
print(f"Received: {msg}")
if msg == "DONE":
break
conn.close()
# Create pipe
parent_conn, child_conn = Pipe()
# Create processes
messages = ["Hello", "World", "From", "Pipe"]
p1 = Process(target=sender, args=(parent_conn, messages))
p2 = Process(target=receiver, args=(child_conn))
p1.start()
p2.start()
p1.join()
p2.join()from multiprocess import Process, Pipe
import time
def ping_pong(conn, name, count):
for i in range(count):
if name == "ping":
msg = f"ping-{i}"
conn.send(msg)
print(f"Sent: {msg}")
response = conn.recv()
print(f"Received: {response}")
else:
request = conn.recv()
print(f"Received: {request}")
msg = f"pong-{i}"
conn.send(msg)
print(f"Sent: {msg}")
time.sleep(0.5)
# Create duplex pipe
conn1, conn2 = Pipe(duplex=True)
# Create processes
p1 = Process(target=ping_pong, args=(conn1, "ping", 3))
p2 = Process(target=ping_pong, args=(conn2, "pong", 3))
p1.start()
p2.start()
p1.join()
p2.join()from multiprocess import Process, Queue
import random
import time
def producer(q, producer_id, num_items):
for i in range(num_items):
item = f"Producer-{producer_id}-Item-{i}"
q.put(item)
print(f"Produced: {item}")
time.sleep(random.uniform(0.1, 0.5))
def consumer(q, consumer_id):
while True:
try:
item = q.get(timeout=2)
print(f"Consumer-{consumer_id} consumed: {item}")
time.sleep(random.uniform(0.2, 0.8))
except:
print(f"Consumer-{consumer_id} timed out, exiting")
break
# Create queue
queue = Queue()
# Create multiple producers
producers = []
for i in range(2):
p = Process(target=producer, args=(queue, i, 5))
p.start()
producers.append(p)
# Create multiple consumers
consumers = []
for i in range(3):
p = Process(target=consumer, args=(queue, i))
p.start()
consumers.append(p)
# Wait for producers to finish
for p in producers:
p.join()
# Wait for consumers to finish (they will timeout)
for p in consumers:
p.join()from multiprocess import Process, Queue
import pickle
# Complex object that requires dill serialization
class ComplexObject:
def __init__(self, func):
self.func = func
self.data = [1, 2, 3, 4, 5]
def process(self):
return [self.func(x) for x in self.data]
def worker(q):
while True:
obj = q.get()
if obj is None:
break
result = obj.process()
print(f"Worker result: {result}")
# Create object with lambda function (requires dill)
complex_obj = ComplexObject(lambda x: x ** 2)
# Create queue and process
queue = Queue()
p = Process(target=worker, args=(queue,))
p.start()
# Send complex object (automatically serialized with dill)
queue.put(complex_obj)
queue.put(None) # Signal completion
p.join()from multiprocess import Process
from multiprocess.connection import Listener, Client
def server(address, authkey):
with Listener(address, authkey=authkey) as listener:
print(f"Server listening on {address}")
with listener.accept() as conn:
print("Connection accepted")
while True:
try:
msg = conn.recv()
print(f"Server received: {msg}")
if msg == "quit":
break
conn.send(f"Echo: {msg}")
except EOFError:
break
def client(address, authkey):
with Client(address, authkey=authkey) as conn:
messages = ["hello", "world", "quit"]
for msg in messages:
conn.send(msg)
if msg != "quit":
response = conn.recv()
print(f"Client received: {response}")
# Authentication key
authkey = b'secret_key'
address = ('localhost', 6000)
# Start server process
server_process = Process(target=server, args=(address, authkey))
server_process.start()
# Give server time to start
import time
time.sleep(0.5)
# Start client process
client_process = Process(target=client, args=(address, authkey))
client_process.start()
client_process.join()
server_process.join()Install with Tessl CLI
npx tessl i tessl/pypi-multiprocess