or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication.mdcontext-management.mdindex.mdmanagers.mdprocess-management.mdprocess-pools.mdqueues.mdshared-memory.mdsynchronization.md

communication.mddocs/

0

# Communication

1

2

Inter-process communication through pipes and connections with support for both object and byte-level messaging, listeners, clients, and connection management.

3

4

## Capabilities

5

6

### Pipes

7

8

Create pairs of connected objects for bidirectional communication between processes.

9

10

```python { .api }

11

def Pipe(duplex=True, rnonblock=False, wnonblock=False) -> tuple[Connection, Connection]:

12

"""

13

Create a pair of connected Connection objects.

14

15

Parameters:

16

- duplex: if True (default), pipe is bidirectional; if False, unidirectional

17

- rnonblock: if True, read operations are non-blocking

18

- wnonblock: if True, write operations are non-blocking

19

20

Returns:

21

Tuple of (Connection, Connection) objects

22

"""

23

```

24

25

Usage example:

26

27

```python

28

from billiard import Process, Pipe

29

import time

30

31

def sender(conn, messages):

32

"""Send messages through connection"""

33

for msg in messages:

34

print(f"Sending: {msg}")

35

conn.send(msg)

36

time.sleep(0.5)

37

conn.close()

38

39

def receiver(conn):

40

"""Receive messages from connection"""

41

while True:

42

try:

43

msg = conn.recv()

44

print(f"Received: {msg}")

45

except EOFError:

46

break

47

conn.close()

48

49

if __name__ == '__main__':

50

# Create pipe

51

parent_conn, child_conn = Pipe()

52

53

messages = ["Hello", "World", "From", "Billiard"]

54

55

# Start processes

56

sender_proc = Process(target=sender, args=(parent_conn, messages))

57

receiver_proc = Process(target=receiver, args=(child_conn))

58

59

sender_proc.start()

60

receiver_proc.start()

61

62

sender_proc.join()

63

receiver_proc.join()

64

```

65

66

### Connections

67

68

Connection objects provide methods for sending and receiving data between processes.

69

70

```python { .api }

71

class Connection:

72

"""

73

Connection object for inter-process communication.

74

"""

75

def send(self, obj):

76

"""

77

Send an object through the connection.

78

79

Parameters:

80

- obj: object to send (must be picklable)

81

"""

82

83

def recv(self):

84

"""

85

Receive an object from the connection.

86

87

Returns:

88

Object received from connection

89

90

Raises:

91

- EOFError: if connection is closed

92

"""

93

94

def send_bytes(self, buf, offset=0, size=None):

95

"""

96

Send bytes through the connection.

97

98

Parameters:

99

- buf: bytes-like object to send

100

- offset: offset in buffer to start from

101

- size: number of bytes to send (None for all remaining)

102

"""

103

104

def recv_bytes(self, maxlength=None) -> bytes:

105

"""

106

Receive bytes from the connection.

107

108

Parameters:

109

- maxlength: maximum number of bytes to receive

110

111

Returns:

112

Bytes received from connection

113

114

Raises:

115

- EOFError: if connection is closed

116

- OSError: if message too long for maxlength

117

"""

118

119

def recv_bytes_into(self, buf, offset=0) -> int:

120

"""

121

Receive bytes into an existing buffer.

122

123

Parameters:

124

- buf: writable buffer to receive into

125

- offset: offset in buffer to start writing

126

127

Returns:

128

Number of bytes received

129

"""

130

131

def poll(self, timeout=None) -> bool:

132

"""

133

Check if data is available for reading.

134

135

Parameters:

136

- timeout: timeout in seconds (None for non-blocking check)

137

138

Returns:

139

True if data is available, False otherwise

140

"""

141

142

def close(self):

143

"""

144

Close the connection.

145

"""

146

147

@property

148

def closed(self) -> bool:

149

"""True if connection is closed."""

150

151

@property

152

def readable(self) -> bool:

153

"""True if connection is readable."""

154

155

@property

156

def writable(self) -> bool:

157

"""True if connection is writable."""

158

```

159

160

Usage example:

161

162

```python

163

from billiard import Process, Pipe

164

import time

165

166

def byte_sender(conn):

167

"""Send raw bytes through connection"""

168

messages = [b"Hello", b"World", b"Bytes"]

169

170

for msg in messages:

171

print(f"Sending bytes: {msg}")

172

conn.send_bytes(msg)

173

time.sleep(0.2)

174

175

conn.close()

176

177

def byte_receiver(conn):

178

"""Receive raw bytes from connection"""

179

while True:

180

if conn.poll(timeout=1):

181

try:

182

data = conn.recv_bytes(maxlength=1024)

183

print(f"Received bytes: {data}")

184

except EOFError:

185

break

186

else:

187

print("No data available")

188

break

189

190

conn.close()

191

192

def polling_example():

193

"""Demonstrate connection polling"""

194

parent_conn, child_conn = Pipe()

195

196

# Start byte communication processes

197

sender_proc = Process(target=byte_sender, args=(parent_conn,))

198

receiver_proc = Process(target=byte_receiver, args=(child_conn,))

199

200

sender_proc.start()

201

receiver_proc.start()

202

203

sender_proc.join()

204

receiver_proc.join()

205

206

if __name__ == '__main__':

207

polling_example()

208

```

209

210

### Listeners and Clients

211

212

Server-client communication using listeners and clients for network-style IPC.

213

214

```python { .api }

215

class Listener:

216

"""

217

A listener for incoming connections.

218

"""

219

def __init__(self, address=None, family=None, backlog=1, authkey=None):

220

"""

221

Create a listener.

222

223

Parameters:

224

- address: address to bind to

225

- family: address family

226

- backlog: maximum number of pending connections

227

- authkey: authentication key

228

"""

229

230

def accept(self) -> Connection:

231

"""

232

Accept a connection and return Connection object.

233

234

Returns:

235

Connection object for accepted connection

236

"""

237

238

def close(self):

239

"""

240

Close the listener.

241

"""

242

243

@property

244

def address(self):

245

"""Address of the listener."""

246

247

@property

248

def last_accepted(self):

249

"""Address of last accepted connection."""

250

251

def Client(address, family=None, authkey=None) -> Connection:

252

"""

253

Create a client connection.

254

255

Parameters:

256

- address: address to connect to

257

- family: address family

258

- authkey: authentication key

259

260

Returns:

261

Connection object

262

"""

263

```

264

265

Usage example:

266

267

```python

268

from billiard import Process

269

from billiard.connection import Listener, Client

270

import time

271

272

def server_process(address):

273

"""Server that accepts connections"""

274

with Listener(address, authkey=b'secret') as listener:

275

print(f"Server listening on {listener.address}")

276

277

# Accept multiple connections

278

for i in range(3):

279

print(f"Waiting for connection {i+1}...")

280

conn = listener.accept()

281

print(f"Connection {i+1} from {listener.last_accepted}")

282

283

# Handle client

284

try:

285

while True:

286

msg = conn.recv()

287

print(f"Server received: {msg}")

288

conn.send(f"Echo: {msg}")

289

except EOFError:

290

print(f"Client {i+1} disconnected")

291

finally:

292

conn.close()

293

294

def client_process(address, client_id):

295

"""Client that connects to server"""

296

try:

297

conn = Client(address, authkey=b'secret')

298

print(f"Client {client_id} connected")

299

300

# Send messages

301

for i in range(3):

302

msg = f"Message {i} from client {client_id}"

303

conn.send(msg)

304

response = conn.recv()

305

print(f"Client {client_id} got response: {response}")

306

time.sleep(0.5)

307

308

conn.close()

309

print(f"Client {client_id} finished")

310

311

except Exception as e:

312

print(f"Client {client_id} error: {e}")

313

314

if __name__ == '__main__':

315

# Use named pipe on Unix or localhost on Windows

316

import sys

317

if sys.platform == 'win32':

318

address = ('localhost', 6000)

319

else:

320

address = '/tmp/billiard_socket'

321

322

# Start server

323

server_proc = Process(target=server_process, args=(address,))

324

server_proc.start()

325

326

time.sleep(0.5) # Let server start

327

328

# Start clients

329

clients = []

330

for i in range(3):

331

client_proc = Process(target=client_process, args=(address, i))

332

clients.append(client_proc)

333

client_proc.start()

334

335

# Wait for completion

336

for client_proc in clients:

337

client_proc.join()

338

339

server_proc.join()

340

```

341

342

### Connection Utilities

343

344

Utility functions for working with multiple connections.

345

346

```python { .api }

347

def wait(object_list, timeout=None) -> list:

348

"""

349

Wait until one or more connections/objects are ready.

350

351

Parameters:

352

- object_list: list of Connection objects or other waitable objects

353

- timeout: timeout in seconds (None for no timeout)

354

355

Returns:

356

List of objects that are ready for reading

357

"""

358

```

359

360

Usage example:

361

362

```python

363

from billiard import Process, Pipe

364

from billiard.connection import wait

365

import time

366

import random

367

368

def delayed_sender(conn, delay, message):

369

"""Send message after delay"""

370

time.sleep(delay)

371

conn.send(message)

372

conn.close()

373

374

def multi_connection_wait():

375

"""Demonstrate waiting on multiple connections"""

376

connections = []

377

processes = []

378

379

# Create multiple pipes with different delays

380

for i in range(4):

381

parent_conn, child_conn = Pipe()

382

connections.append(parent_conn)

383

384

delay = random.uniform(0.5, 2.0)

385

message = f"Message from connection {i}"

386

387

proc = Process(target=delayed_sender, args=(child_conn, delay, message))

388

processes.append(proc)

389

proc.start()

390

391

print("Waiting for messages from multiple connections...")

392

393

# Wait for connections to become ready

394

ready_count = 0

395

while ready_count < len(connections):

396

ready = wait(connections, timeout=3.0)

397

398

if ready:

399

print(f"{len(ready)} connections ready")

400

for conn in ready:

401

try:

402

msg = conn.recv()

403

print(f"Received: {msg}")

404

ready_count += 1

405

connections.remove(conn)

406

except EOFError:

407

pass

408

else:

409

print("Timeout waiting for connections")

410

break

411

412

# Clean up

413

for proc in processes:

414

proc.join()

415

416

for conn in connections:

417

conn.close()

418

419

if __name__ == '__main__':

420

multi_connection_wait()

421

```

422

423

## Communication Patterns

424

425

### Producer-Consumer with Pipes

426

427

```python

428

from billiard import Process, Pipe

429

import time

430

431

def producer(conn):

432

for i in range(5):

433

item = f"item_{i}"

434

conn.send(item)

435

print(f"Produced: {item}")

436

time.sleep(0.5)

437

conn.send(None) # Signal end

438

conn.close()

439

440

def consumer(conn):

441

while True:

442

item = conn.recv()

443

if item is None:

444

break

445

print(f"Consumed: {item}")

446

time.sleep(0.3)

447

conn.close()

448

449

# Usage

450

parent_conn, child_conn = Pipe()

451

prod = Process(target=producer, args=(parent_conn,))

452

cons = Process(target=consumer, args=(child_conn,))

453

prod.start()

454

cons.start()

455

prod.join()

456

cons.join()

457

```

458

459

### Request-Response Pattern

460

461

```python

462

from billiard import Process, Pipe

463

464

def service(conn):

465

while True:

466

try:

467

request = conn.recv()

468

response = f"Processed: {request}"

469

conn.send(response)

470

except EOFError:

471

break

472

conn.close()

473

474

def client_requests(conn):

475

for i in range(3):

476

request = f"request_{i}"

477

conn.send(request)

478

response = conn.recv()

479

print(f"Got response: {response}")

480

conn.close()

481

482

# Usage

483

service_conn, client_conn = Pipe()

484

srv = Process(target=service, args=(service_conn,))

485

cli = Process(target=client_requests, args=(client_conn,))

486

srv.start()

487

cli.start()

488

cli.join()

489

srv.terminate()

490

srv.join()

491

```