0
# Inter-Process Communication
1
2
Communication mechanisms for data exchange between processes. Multiprocess provides queues for message passing and pipes for bidirectional communication, with enhanced serialization support via dill.
3
4
## Capabilities
5
6
### Queue Classes
7
8
Thread-safe queues for passing objects between processes with FIFO semantics.
9
10
```python { .api }
11
class Queue:
12
"""
13
A multi-producer, multi-consumer queue for inter-process communication.
14
15
Args:
16
maxsize: maximum size of the queue (0 = unlimited)
17
"""
18
def __init__(self, maxsize=0): ...
19
20
def put(self, item, block=True, timeout=None):
21
"""
22
Put an item into the queue.
23
24
Args:
25
item: object to put in queue
26
block: if True, block until space is available
27
timeout: maximum time to wait (seconds)
28
29
Raises:
30
queue.Full: if queue is full and block=False or timeout exceeded
31
"""
32
33
def get(self, block=True, timeout=None):
34
"""
35
Remove and return an item from the queue.
36
37
Args:
38
block: if True, block until item is available
39
timeout: maximum time to wait (seconds)
40
41
Returns:
42
object: item from queue
43
44
Raises:
45
queue.Empty: if queue is empty and block=False or timeout exceeded
46
"""
47
48
def put_nowait(self, item):
49
"""
50
Put an item without blocking.
51
52
Args:
53
item: object to put in queue
54
55
Raises:
56
queue.Full: if queue is full
57
"""
58
59
def get_nowait(self):
60
"""
61
Get an item without blocking.
62
63
Returns:
64
object: item from queue
65
66
Raises:
67
queue.Empty: if queue is empty
68
"""
69
70
def empty(self):
71
"""
72
Return True if the queue is empty.
73
74
Returns:
75
bool: True if queue is empty (approximate)
76
"""
77
78
def full(self):
79
"""
80
Return True if the queue is full.
81
82
Returns:
83
bool: True if queue is full (approximate)
84
"""
85
86
def qsize(self):
87
"""
88
Return the approximate size of the queue.
89
90
Returns:
91
int: approximate number of items in queue
92
"""
93
94
def close(self):
95
"""Indicate that no more data will be put on this queue."""
96
97
def join_thread(self):
98
"""Join the background thread used by the queue."""
99
```
100
101
### JoinableQueue Class
102
103
Queue with task tracking capabilities for producer-consumer patterns.
104
105
```python { .api }
106
class JoinableQueue(Queue):
107
"""
108
A Queue subclass that adds task tracking capabilities.
109
110
Args:
111
maxsize: maximum size of the queue (0 = unlimited)
112
"""
113
def __init__(self, maxsize=0): ...
114
115
def task_done(self):
116
"""
117
Indicate that a formerly enqueued task is complete.
118
119
Must be called once for each item retrieved from the queue.
120
"""
121
122
def join(self):
123
"""
124
Block until all items in the queue have been gotten and processed.
125
126
The count of unfinished tasks goes up when items are added and
127
goes down when task_done() is called.
128
"""
129
```
130
131
### SimpleQueue Class
132
133
Simplified queue implementation with minimal overhead.
134
135
```python { .api }
136
class SimpleQueue:
137
"""
138
A simplified queue implementation with lower overhead.
139
"""
140
def __init__(self): ...
141
142
def put(self, item):
143
"""
144
Put an item into the queue.
145
146
Args:
147
item: object to put in queue
148
"""
149
150
def get(self):
151
"""
152
Remove and return an item from the queue.
153
154
Returns:
155
object: item from queue
156
"""
157
158
def empty(self):
159
"""
160
Return True if the queue is empty.
161
162
Returns:
163
bool: True if queue is empty
164
"""
165
166
def close(self):
167
"""Indicate that no more data will be put on this queue."""
168
```
169
170
### Pipe Communication
171
172
Bidirectional communication channels between processes.
173
174
```python { .api }
175
def Pipe(duplex=True):
176
"""
177
Create a pipe between two processes.
178
179
Args:
180
duplex: if True, pipe is bidirectional; if False, unidirectional
181
182
Returns:
183
tuple[Connection, Connection]: pair of Connection objects
184
"""
185
```
186
187
#### Connection Objects
188
189
```python { .api }
190
class Connection:
191
"""
192
Connection object for pipe communication.
193
"""
194
def send(self, obj):
195
"""
196
Send an object through the connection.
197
198
Args:
199
obj: object to send
200
"""
201
202
def recv(self):
203
"""
204
Receive an object from the connection.
205
206
Returns:
207
object: received object
208
"""
209
210
def send_bytes(self, buffer, offset=0, size=None):
211
"""
212
Send byte data through the connection.
213
214
Args:
215
buffer: bytes-like object to send
216
offset: starting position in buffer
217
size: number of bytes to send
218
"""
219
220
def recv_bytes(self, maxlength=None):
221
"""
222
Receive byte data from the connection.
223
224
Args:
225
maxlength: maximum number of bytes to receive
226
227
Returns:
228
bytes: received byte data
229
"""
230
231
def recv_bytes_into(self, buffer, offset=0):
232
"""
233
Receive byte data into an existing buffer.
234
235
Args:
236
buffer: writable buffer to receive into
237
offset: starting position in buffer
238
239
Returns:
240
int: number of bytes received
241
"""
242
243
def poll(self, timeout=0.0):
244
"""
245
Check if data is available for reading.
246
247
Args:
248
timeout: time to wait for data (seconds)
249
250
Returns:
251
bool: True if data is available
252
"""
253
254
def close(self):
255
"""Close the connection."""
256
257
# Properties
258
readable: bool # True if connection can receive
259
writable: bool # True if connection can send
260
closed: bool # True if connection is closed
261
```
262
263
### Advanced Connection Functions
264
265
Additional functions for working with connections and sockets.
266
267
```python { .api }
268
def wait(object_list, timeout=None):
269
"""
270
Wait until one or more objects in object_list are ready.
271
272
Args:
273
object_list: list of Connection objects or other waitable objects
274
timeout: maximum time to wait (seconds)
275
276
Returns:
277
list: subset of object_list that are ready
278
"""
279
280
class Listener:
281
"""
282
A wrapper for a bound socket which is 'listening' for connections.
283
284
Args:
285
address: address to bind to
286
family: socket family (default: None for auto-detection)
287
backlog: maximum number of pending connections
288
authkey: authentication key for connections
289
"""
290
def __init__(self, address=None, family=None, backlog=1, authkey=None): ...
291
292
def accept(self):
293
"""
294
Accept a connection on the bound socket.
295
296
Returns:
297
Connection: new connection object
298
"""
299
300
def close(self):
301
"""Close the listener."""
302
303
# Properties
304
address: tuple # Address the listener is bound to
305
last_accepted: Connection # Last accepted connection
306
307
def Client(address, family=None, authkey=None):
308
"""
309
Connect to a Listener and return a Connection object.
310
311
Args:
312
address: address to connect to
313
family: socket family (default: None for auto-detection)
314
authkey: authentication key
315
316
Returns:
317
Connection: connection object
318
"""
319
```
320
321
## Usage Examples
322
323
### Basic Queue Communication
324
325
```python
326
from multiprocess import Process, Queue
327
328
def producer(q, items):
329
for item in items:
330
print(f"Producing {item}")
331
q.put(item)
332
q.put(None) # Signal completion
333
334
def consumer(q):
335
while True:
336
item = q.get()
337
if item is None:
338
break
339
print(f"Consuming {item}")
340
341
# Create queue
342
queue = Queue()
343
344
# Create processes
345
items_to_produce = ['item1', 'item2', 'item3', 'item4']
346
prod = Process(target=producer, args=(queue, items_to_produce))
347
cons = Process(target=consumer, args=(queue,))
348
349
# Start processes
350
prod.start()
351
cons.start()
352
353
# Wait for completion
354
prod.join()
355
cons.join()
356
```
357
358
### JoinableQueue with Task Tracking
359
360
```python
361
from multiprocess import Process, JoinableQueue
362
import time
363
364
def worker(q):
365
while True:
366
item = q.get()
367
if item is None:
368
break
369
print(f"Processing {item}")
370
time.sleep(1) # Simulate work
371
q.task_done()
372
373
def add_tasks(q, tasks):
374
for task in tasks:
375
q.put(task)
376
377
# Create joinable queue
378
q = JoinableQueue()
379
380
# Start worker processes
381
workers = []
382
for i in range(2):
383
p = Process(target=worker, args=(q,))
384
p.start()
385
workers.append(p)
386
387
# Add tasks
388
tasks = [f"task-{i}" for i in range(5)]
389
for task in tasks:
390
q.put(task)
391
392
# Wait for all tasks to complete
393
q.join()
394
print("All tasks completed")
395
396
# Stop workers
397
for _ in workers:
398
q.put(None)
399
for p in workers:
400
p.join()
401
```
402
403
### Pipe Communication
404
405
```python
406
from multiprocess import Process, Pipe
407
408
def sender(conn, messages):
409
for msg in messages:
410
print(f"Sending: {msg}")
411
conn.send(msg)
412
conn.send("DONE")
413
conn.close()
414
415
def receiver(conn):
416
while True:
417
msg = conn.recv()
418
print(f"Received: {msg}")
419
if msg == "DONE":
420
break
421
conn.close()
422
423
# Create pipe
424
parent_conn, child_conn = Pipe()
425
426
# Create processes
427
messages = ["Hello", "World", "From", "Pipe"]
428
p1 = Process(target=sender, args=(parent_conn, messages))
429
p2 = Process(target=receiver, args=(child_conn))
430
431
p1.start()
432
p2.start()
433
434
p1.join()
435
p2.join()
436
```
437
438
### Bidirectional Pipe Communication
439
440
```python
441
from multiprocess import Process, Pipe
442
import time
443
444
def ping_pong(conn, name, count):
445
for i in range(count):
446
if name == "ping":
447
msg = f"ping-{i}"
448
conn.send(msg)
449
print(f"Sent: {msg}")
450
response = conn.recv()
451
print(f"Received: {response}")
452
else:
453
request = conn.recv()
454
print(f"Received: {request}")
455
msg = f"pong-{i}"
456
conn.send(msg)
457
print(f"Sent: {msg}")
458
time.sleep(0.5)
459
460
# Create duplex pipe
461
conn1, conn2 = Pipe(duplex=True)
462
463
# Create processes
464
p1 = Process(target=ping_pong, args=(conn1, "ping", 3))
465
p2 = Process(target=ping_pong, args=(conn2, "pong", 3))
466
467
p1.start()
468
p2.start()
469
470
p1.join()
471
p2.join()
472
```
473
474
### Multiple Producers and Consumers
475
476
```python
477
from multiprocess import Process, Queue
478
import random
479
import time
480
481
def producer(q, producer_id, num_items):
482
for i in range(num_items):
483
item = f"Producer-{producer_id}-Item-{i}"
484
q.put(item)
485
print(f"Produced: {item}")
486
time.sleep(random.uniform(0.1, 0.5))
487
488
def consumer(q, consumer_id):
489
while True:
490
try:
491
item = q.get(timeout=2)
492
print(f"Consumer-{consumer_id} consumed: {item}")
493
time.sleep(random.uniform(0.2, 0.8))
494
except:
495
print(f"Consumer-{consumer_id} timed out, exiting")
496
break
497
498
# Create queue
499
queue = Queue()
500
501
# Create multiple producers
502
producers = []
503
for i in range(2):
504
p = Process(target=producer, args=(queue, i, 5))
505
p.start()
506
producers.append(p)
507
508
# Create multiple consumers
509
consumers = []
510
for i in range(3):
511
p = Process(target=consumer, args=(queue, i))
512
p.start()
513
consumers.append(p)
514
515
# Wait for producers to finish
516
for p in producers:
517
p.join()
518
519
# Wait for consumers to finish (they will timeout)
520
for p in consumers:
521
p.join()
522
```
523
524
### Enhanced Serialization Example
525
526
```python
527
from multiprocess import Process, Queue
528
import pickle
529
530
# Complex object that requires dill serialization
531
class ComplexObject:
532
def __init__(self, func):
533
self.func = func
534
self.data = [1, 2, 3, 4, 5]
535
536
def process(self):
537
return [self.func(x) for x in self.data]
538
539
def worker(q):
540
while True:
541
obj = q.get()
542
if obj is None:
543
break
544
result = obj.process()
545
print(f"Worker result: {result}")
546
547
# Create object with lambda function (requires dill)
548
complex_obj = ComplexObject(lambda x: x ** 2)
549
550
# Create queue and process
551
queue = Queue()
552
p = Process(target=worker, args=(queue,))
553
p.start()
554
555
# Send complex object (automatically serialized with dill)
556
queue.put(complex_obj)
557
queue.put(None) # Signal completion
558
559
p.join()
560
```
561
562
### Connection with Authentication
563
564
```python
565
from multiprocess import Process
566
from multiprocess.connection import Listener, Client
567
568
def server(address, authkey):
569
with Listener(address, authkey=authkey) as listener:
570
print(f"Server listening on {address}")
571
with listener.accept() as conn:
572
print("Connection accepted")
573
while True:
574
try:
575
msg = conn.recv()
576
print(f"Server received: {msg}")
577
if msg == "quit":
578
break
579
conn.send(f"Echo: {msg}")
580
except EOFError:
581
break
582
583
def client(address, authkey):
584
with Client(address, authkey=authkey) as conn:
585
messages = ["hello", "world", "quit"]
586
for msg in messages:
587
conn.send(msg)
588
if msg != "quit":
589
response = conn.recv()
590
print(f"Client received: {response}")
591
592
# Authentication key
593
authkey = b'secret_key'
594
address = ('localhost', 6000)
595
596
# Start server process
597
server_process = Process(target=server, args=(address, authkey))
598
server_process.start()
599
600
# Give server time to start
601
import time
602
time.sleep(0.5)
603
604
# Start client process
605
client_process = Process(target=client, args=(address, authkey))
606
client_process.start()
607
608
client_process.join()
609
server_process.join()
610
```