0
# Communication
1
2
Inter-process communication through pipes and connections with support for both object and byte-level messaging, listeners, clients, and connection management.
3
4
## Capabilities
5
6
### Pipes
7
8
Create pairs of connected objects for bidirectional communication between processes.
9
10
```python { .api }
11
def Pipe(duplex=True, rnonblock=False, wnonblock=False) -> tuple[Connection, Connection]:
12
"""
13
Create a pair of connected Connection objects.
14
15
Parameters:
16
- duplex: if True (default), pipe is bidirectional; if False, unidirectional
17
- rnonblock: if True, read operations are non-blocking
18
- wnonblock: if True, write operations are non-blocking
19
20
Returns:
21
Tuple of (Connection, Connection) objects
22
"""
23
```
24
25
Usage example:
26
27
```python
28
from billiard import Process, Pipe
29
import time
30
31
def sender(conn, messages):
32
"""Send messages through connection"""
33
for msg in messages:
34
print(f"Sending: {msg}")
35
conn.send(msg)
36
time.sleep(0.5)
37
conn.close()
38
39
def receiver(conn):
40
"""Receive messages from connection"""
41
while True:
42
try:
43
msg = conn.recv()
44
print(f"Received: {msg}")
45
except EOFError:
46
break
47
conn.close()
48
49
if __name__ == '__main__':
50
# Create pipe
51
parent_conn, child_conn = Pipe()
52
53
messages = ["Hello", "World", "From", "Billiard"]
54
55
# Start processes
56
sender_proc = Process(target=sender, args=(parent_conn, messages))
57
receiver_proc = Process(target=receiver, args=(child_conn))
58
59
sender_proc.start()
60
receiver_proc.start()
61
62
sender_proc.join()
63
receiver_proc.join()
64
```
65
66
### Connections
67
68
Connection objects provide methods for sending and receiving data between processes.
69
70
```python { .api }
71
class Connection:
72
"""
73
Connection object for inter-process communication.
74
"""
75
def send(self, obj):
76
"""
77
Send an object through the connection.
78
79
Parameters:
80
- obj: object to send (must be picklable)
81
"""
82
83
def recv(self):
84
"""
85
Receive an object from the connection.
86
87
Returns:
88
Object received from connection
89
90
Raises:
91
- EOFError: if connection is closed
92
"""
93
94
def send_bytes(self, buf, offset=0, size=None):
95
"""
96
Send bytes through the connection.
97
98
Parameters:
99
- buf: bytes-like object to send
100
- offset: offset in buffer to start from
101
- size: number of bytes to send (None for all remaining)
102
"""
103
104
def recv_bytes(self, maxlength=None) -> bytes:
105
"""
106
Receive bytes from the connection.
107
108
Parameters:
109
- maxlength: maximum number of bytes to receive
110
111
Returns:
112
Bytes received from connection
113
114
Raises:
115
- EOFError: if connection is closed
116
- OSError: if message too long for maxlength
117
"""
118
119
def recv_bytes_into(self, buf, offset=0) -> int:
120
"""
121
Receive bytes into an existing buffer.
122
123
Parameters:
124
- buf: writable buffer to receive into
125
- offset: offset in buffer to start writing
126
127
Returns:
128
Number of bytes received
129
"""
130
131
def poll(self, timeout=None) -> bool:
132
"""
133
Check if data is available for reading.
134
135
Parameters:
136
- timeout: timeout in seconds (None for non-blocking check)
137
138
Returns:
139
True if data is available, False otherwise
140
"""
141
142
def close(self):
143
"""
144
Close the connection.
145
"""
146
147
@property
148
def closed(self) -> bool:
149
"""True if connection is closed."""
150
151
@property
152
def readable(self) -> bool:
153
"""True if connection is readable."""
154
155
@property
156
def writable(self) -> bool:
157
"""True if connection is writable."""
158
```
159
160
Usage example:
161
162
```python
163
from billiard import Process, Pipe
164
import time
165
166
def byte_sender(conn):
167
"""Send raw bytes through connection"""
168
messages = [b"Hello", b"World", b"Bytes"]
169
170
for msg in messages:
171
print(f"Sending bytes: {msg}")
172
conn.send_bytes(msg)
173
time.sleep(0.2)
174
175
conn.close()
176
177
def byte_receiver(conn):
178
"""Receive raw bytes from connection"""
179
while True:
180
if conn.poll(timeout=1):
181
try:
182
data = conn.recv_bytes(maxlength=1024)
183
print(f"Received bytes: {data}")
184
except EOFError:
185
break
186
else:
187
print("No data available")
188
break
189
190
conn.close()
191
192
def polling_example():
193
"""Demonstrate connection polling"""
194
parent_conn, child_conn = Pipe()
195
196
# Start byte communication processes
197
sender_proc = Process(target=byte_sender, args=(parent_conn,))
198
receiver_proc = Process(target=byte_receiver, args=(child_conn,))
199
200
sender_proc.start()
201
receiver_proc.start()
202
203
sender_proc.join()
204
receiver_proc.join()
205
206
if __name__ == '__main__':
207
polling_example()
208
```
209
210
### Listeners and Clients
211
212
Server-client communication using listeners and clients for network-style IPC.
213
214
```python { .api }
215
class Listener:
216
"""
217
A listener for incoming connections.
218
"""
219
def __init__(self, address=None, family=None, backlog=1, authkey=None):
220
"""
221
Create a listener.
222
223
Parameters:
224
- address: address to bind to
225
- family: address family
226
- backlog: maximum number of pending connections
227
- authkey: authentication key
228
"""
229
230
def accept(self) -> Connection:
231
"""
232
Accept a connection and return Connection object.
233
234
Returns:
235
Connection object for accepted connection
236
"""
237
238
def close(self):
239
"""
240
Close the listener.
241
"""
242
243
@property
244
def address(self):
245
"""Address of the listener."""
246
247
@property
248
def last_accepted(self):
249
"""Address of last accepted connection."""
250
251
def Client(address, family=None, authkey=None) -> Connection:
252
"""
253
Create a client connection.
254
255
Parameters:
256
- address: address to connect to
257
- family: address family
258
- authkey: authentication key
259
260
Returns:
261
Connection object
262
"""
263
```
264
265
Usage example:
266
267
```python
268
from billiard import Process
269
from billiard.connection import Listener, Client
270
import time
271
272
def server_process(address):
273
"""Server that accepts connections"""
274
with Listener(address, authkey=b'secret') as listener:
275
print(f"Server listening on {listener.address}")
276
277
# Accept multiple connections
278
for i in range(3):
279
print(f"Waiting for connection {i+1}...")
280
conn = listener.accept()
281
print(f"Connection {i+1} from {listener.last_accepted}")
282
283
# Handle client
284
try:
285
while True:
286
msg = conn.recv()
287
print(f"Server received: {msg}")
288
conn.send(f"Echo: {msg}")
289
except EOFError:
290
print(f"Client {i+1} disconnected")
291
finally:
292
conn.close()
293
294
def client_process(address, client_id):
295
"""Client that connects to server"""
296
try:
297
conn = Client(address, authkey=b'secret')
298
print(f"Client {client_id} connected")
299
300
# Send messages
301
for i in range(3):
302
msg = f"Message {i} from client {client_id}"
303
conn.send(msg)
304
response = conn.recv()
305
print(f"Client {client_id} got response: {response}")
306
time.sleep(0.5)
307
308
conn.close()
309
print(f"Client {client_id} finished")
310
311
except Exception as e:
312
print(f"Client {client_id} error: {e}")
313
314
if __name__ == '__main__':
315
# Use named pipe on Unix or localhost on Windows
316
import sys
317
if sys.platform == 'win32':
318
address = ('localhost', 6000)
319
else:
320
address = '/tmp/billiard_socket'
321
322
# Start server
323
server_proc = Process(target=server_process, args=(address,))
324
server_proc.start()
325
326
time.sleep(0.5) # Let server start
327
328
# Start clients
329
clients = []
330
for i in range(3):
331
client_proc = Process(target=client_process, args=(address, i))
332
clients.append(client_proc)
333
client_proc.start()
334
335
# Wait for completion
336
for client_proc in clients:
337
client_proc.join()
338
339
server_proc.join()
340
```
341
342
### Connection Utilities
343
344
Utility functions for working with multiple connections.
345
346
```python { .api }
347
def wait(object_list, timeout=None) -> list:
348
"""
349
Wait until one or more connections/objects are ready.
350
351
Parameters:
352
- object_list: list of Connection objects or other waitable objects
353
- timeout: timeout in seconds (None for no timeout)
354
355
Returns:
356
List of objects that are ready for reading
357
"""
358
```
359
360
Usage example:
361
362
```python
363
from billiard import Process, Pipe
364
from billiard.connection import wait
365
import time
366
import random
367
368
def delayed_sender(conn, delay, message):
369
"""Send message after delay"""
370
time.sleep(delay)
371
conn.send(message)
372
conn.close()
373
374
def multi_connection_wait():
375
"""Demonstrate waiting on multiple connections"""
376
connections = []
377
processes = []
378
379
# Create multiple pipes with different delays
380
for i in range(4):
381
parent_conn, child_conn = Pipe()
382
connections.append(parent_conn)
383
384
delay = random.uniform(0.5, 2.0)
385
message = f"Message from connection {i}"
386
387
proc = Process(target=delayed_sender, args=(child_conn, delay, message))
388
processes.append(proc)
389
proc.start()
390
391
print("Waiting for messages from multiple connections...")
392
393
# Wait for connections to become ready
394
ready_count = 0
395
while ready_count < len(connections):
396
ready = wait(connections, timeout=3.0)
397
398
if ready:
399
print(f"{len(ready)} connections ready")
400
for conn in ready:
401
try:
402
msg = conn.recv()
403
print(f"Received: {msg}")
404
ready_count += 1
405
connections.remove(conn)
406
except EOFError:
407
pass
408
else:
409
print("Timeout waiting for connections")
410
break
411
412
# Clean up
413
for proc in processes:
414
proc.join()
415
416
for conn in connections:
417
conn.close()
418
419
if __name__ == '__main__':
420
multi_connection_wait()
421
```
422
423
## Communication Patterns
424
425
### Producer-Consumer with Pipes
426
427
```python
428
from billiard import Process, Pipe
429
import time
430
431
def producer(conn):
432
for i in range(5):
433
item = f"item_{i}"
434
conn.send(item)
435
print(f"Produced: {item}")
436
time.sleep(0.5)
437
conn.send(None) # Signal end
438
conn.close()
439
440
def consumer(conn):
441
while True:
442
item = conn.recv()
443
if item is None:
444
break
445
print(f"Consumed: {item}")
446
time.sleep(0.3)
447
conn.close()
448
449
# Usage
450
parent_conn, child_conn = Pipe()
451
prod = Process(target=producer, args=(parent_conn,))
452
cons = Process(target=consumer, args=(child_conn,))
453
prod.start()
454
cons.start()
455
prod.join()
456
cons.join()
457
```
458
459
### Request-Response Pattern
460
461
```python
462
from billiard import Process, Pipe
463
464
def service(conn):
465
while True:
466
try:
467
request = conn.recv()
468
response = f"Processed: {request}"
469
conn.send(response)
470
except EOFError:
471
break
472
conn.close()
473
474
def client_requests(conn):
475
for i in range(3):
476
request = f"request_{i}"
477
conn.send(request)
478
response = conn.recv()
479
print(f"Got response: {response}")
480
conn.close()
481
482
# Usage
483
service_conn, client_conn = Pipe()
484
srv = Process(target=service, args=(service_conn,))
485
cli = Process(target=client_requests, args=(client_conn,))
486
srv.start()
487
cli.start()
488
cli.join()
489
srv.terminate()
490
srv.join()
491
```