or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication.mdcontext-config.mdindex.mdpools.mdprocess-management.mdshared-objects.mdsynchronization.md

communication.mddocs/

0

# Inter-Process Communication

1

2

Communication mechanisms for data exchange between processes. Multiprocess provides queues for message passing and pipes for bidirectional communication, with enhanced serialization support via dill.

3

4

## Capabilities

5

6

### Queue Classes

7

8

Thread-safe queues for passing objects between processes with FIFO semantics.

9

10

```python { .api }

11

class Queue:

12

"""

13

A multi-producer, multi-consumer queue for inter-process communication.

14

15

Args:

16

maxsize: maximum size of the queue (0 = unlimited)

17

"""

18

def __init__(self, maxsize=0): ...

19

20

def put(self, item, block=True, timeout=None):

21

"""

22

Put an item into the queue.

23

24

Args:

25

item: object to put in queue

26

block: if True, block until space is available

27

timeout: maximum time to wait (seconds)

28

29

Raises:

30

queue.Full: if queue is full and block=False or timeout exceeded

31

"""

32

33

def get(self, block=True, timeout=None):

34

"""

35

Remove and return an item from the queue.

36

37

Args:

38

block: if True, block until item is available

39

timeout: maximum time to wait (seconds)

40

41

Returns:

42

object: item from queue

43

44

Raises:

45

queue.Empty: if queue is empty and block=False or timeout exceeded

46

"""

47

48

def put_nowait(self, item):

49

"""

50

Put an item without blocking.

51

52

Args:

53

item: object to put in queue

54

55

Raises:

56

queue.Full: if queue is full

57

"""

58

59

def get_nowait(self):

60

"""

61

Get an item without blocking.

62

63

Returns:

64

object: item from queue

65

66

Raises:

67

queue.Empty: if queue is empty

68

"""

69

70

def empty(self):

71

"""

72

Return True if the queue is empty.

73

74

Returns:

75

bool: True if queue is empty (approximate)

76

"""

77

78

def full(self):

79

"""

80

Return True if the queue is full.

81

82

Returns:

83

bool: True if queue is full (approximate)

84

"""

85

86

def qsize(self):

87

"""

88

Return the approximate size of the queue.

89

90

Returns:

91

int: approximate number of items in queue

92

"""

93

94

def close(self):

95

"""Indicate that no more data will be put on this queue."""

96

97

def join_thread(self):

98

"""Join the background thread used by the queue."""

99

```

100

101

### JoinableQueue Class

102

103

Queue with task tracking capabilities for producer-consumer patterns.

104

105

```python { .api }

106

class JoinableQueue(Queue):

107

"""

108

A Queue subclass that adds task tracking capabilities.

109

110

Args:

111

maxsize: maximum size of the queue (0 = unlimited)

112

"""

113

def __init__(self, maxsize=0): ...

114

115

def task_done(self):

116

"""

117

Indicate that a formerly enqueued task is complete.

118

119

Must be called once for each item retrieved from the queue.

120

"""

121

122

def join(self):

123

"""

124

Block until all items in the queue have been gotten and processed.

125

126

The count of unfinished tasks goes up when items are added and

127

goes down when task_done() is called.

128

"""

129

```

130

131

### SimpleQueue Class

132

133

Simplified queue implementation with minimal overhead.

134

135

```python { .api }

136

class SimpleQueue:

137

"""

138

A simplified queue implementation with lower overhead.

139

"""

140

def __init__(self): ...

141

142

def put(self, item):

143

"""

144

Put an item into the queue.

145

146

Args:

147

item: object to put in queue

148

"""

149

150

def get(self):

151

"""

152

Remove and return an item from the queue.

153

154

Returns:

155

object: item from queue

156

"""

157

158

def empty(self):

159

"""

160

Return True if the queue is empty.

161

162

Returns:

163

bool: True if queue is empty

164

"""

165

166

def close(self):

167

"""Indicate that no more data will be put on this queue."""

168

```

169

170

### Pipe Communication

171

172

Bidirectional communication channels between processes.

173

174

```python { .api }

175

def Pipe(duplex=True):

176

"""

177

Create a pipe between two processes.

178

179

Args:

180

duplex: if True, pipe is bidirectional; if False, unidirectional

181

182

Returns:

183

tuple[Connection, Connection]: pair of Connection objects

184

"""

185

```

186

187

#### Connection Objects

188

189

```python { .api }

190

class Connection:

191

"""

192

Connection object for pipe communication.

193

"""

194

def send(self, obj):

195

"""

196

Send an object through the connection.

197

198

Args:

199

obj: object to send

200

"""

201

202

def recv(self):

203

"""

204

Receive an object from the connection.

205

206

Returns:

207

object: received object

208

"""

209

210

def send_bytes(self, buffer, offset=0, size=None):

211

"""

212

Send byte data through the connection.

213

214

Args:

215

buffer: bytes-like object to send

216

offset: starting position in buffer

217

size: number of bytes to send

218

"""

219

220

def recv_bytes(self, maxlength=None):

221

"""

222

Receive byte data from the connection.

223

224

Args:

225

maxlength: maximum number of bytes to receive

226

227

Returns:

228

bytes: received byte data

229

"""

230

231

def recv_bytes_into(self, buffer, offset=0):

232

"""

233

Receive byte data into an existing buffer.

234

235

Args:

236

buffer: writable buffer to receive into

237

offset: starting position in buffer

238

239

Returns:

240

int: number of bytes received

241

"""

242

243

def poll(self, timeout=0.0):

244

"""

245

Check if data is available for reading.

246

247

Args:

248

timeout: time to wait for data (seconds)

249

250

Returns:

251

bool: True if data is available

252

"""

253

254

def close(self):

255

"""Close the connection."""

256

257

# Properties

258

readable: bool # True if connection can receive

259

writable: bool # True if connection can send

260

closed: bool # True if connection is closed

261

```

262

263

### Advanced Connection Functions

264

265

Additional functions for working with connections and sockets.

266

267

```python { .api }

268

def wait(object_list, timeout=None):

269

"""

270

Wait until one or more objects in object_list are ready.

271

272

Args:

273

object_list: list of Connection objects or other waitable objects

274

timeout: maximum time to wait (seconds)

275

276

Returns:

277

list: subset of object_list that are ready

278

"""

279

280

class Listener:

281

"""

282

A wrapper for a bound socket which is 'listening' for connections.

283

284

Args:

285

address: address to bind to

286

family: socket family (default: None for auto-detection)

287

backlog: maximum number of pending connections

288

authkey: authentication key for connections

289

"""

290

def __init__(self, address=None, family=None, backlog=1, authkey=None): ...

291

292

def accept(self):

293

"""

294

Accept a connection on the bound socket.

295

296

Returns:

297

Connection: new connection object

298

"""

299

300

def close(self):

301

"""Close the listener."""

302

303

# Properties

304

address: tuple # Address the listener is bound to

305

last_accepted: Connection # Last accepted connection

306

307

def Client(address, family=None, authkey=None):

308

"""

309

Connect to a Listener and return a Connection object.

310

311

Args:

312

address: address to connect to

313

family: socket family (default: None for auto-detection)

314

authkey: authentication key

315

316

Returns:

317

Connection: connection object

318

"""

319

```

320

321

## Usage Examples

322

323

### Basic Queue Communication

324

325

```python

326

from multiprocess import Process, Queue

327

328

def producer(q, items):

329

for item in items:

330

print(f"Producing {item}")

331

q.put(item)

332

q.put(None) # Signal completion

333

334

def consumer(q):

335

while True:

336

item = q.get()

337

if item is None:

338

break

339

print(f"Consuming {item}")

340

341

# Create queue

342

queue = Queue()

343

344

# Create processes

345

items_to_produce = ['item1', 'item2', 'item3', 'item4']

346

prod = Process(target=producer, args=(queue, items_to_produce))

347

cons = Process(target=consumer, args=(queue,))

348

349

# Start processes

350

prod.start()

351

cons.start()

352

353

# Wait for completion

354

prod.join()

355

cons.join()

356

```

357

358

### JoinableQueue with Task Tracking

359

360

```python

361

from multiprocess import Process, JoinableQueue

362

import time

363

364

def worker(q):

365

while True:

366

item = q.get()

367

if item is None:

368

break

369

print(f"Processing {item}")

370

time.sleep(1) # Simulate work

371

q.task_done()

372

373

def add_tasks(q, tasks):

374

for task in tasks:

375

q.put(task)

376

377

# Create joinable queue

378

q = JoinableQueue()

379

380

# Start worker processes

381

workers = []

382

for i in range(2):

383

p = Process(target=worker, args=(q,))

384

p.start()

385

workers.append(p)

386

387

# Add tasks

388

tasks = [f"task-{i}" for i in range(5)]

389

for task in tasks:

390

q.put(task)

391

392

# Wait for all tasks to complete

393

q.join()

394

print("All tasks completed")

395

396

# Stop workers

397

for _ in workers:

398

q.put(None)

399

for p in workers:

400

p.join()

401

```

402

403

### Pipe Communication

404

405

```python

406

from multiprocess import Process, Pipe

407

408

def sender(conn, messages):

409

for msg in messages:

410

print(f"Sending: {msg}")

411

conn.send(msg)

412

conn.send("DONE")

413

conn.close()

414

415

def receiver(conn):

416

while True:

417

msg = conn.recv()

418

print(f"Received: {msg}")

419

if msg == "DONE":

420

break

421

conn.close()

422

423

# Create pipe

424

parent_conn, child_conn = Pipe()

425

426

# Create processes

427

messages = ["Hello", "World", "From", "Pipe"]

428

p1 = Process(target=sender, args=(parent_conn, messages))

429

p2 = Process(target=receiver, args=(child_conn))

430

431

p1.start()

432

p2.start()

433

434

p1.join()

435

p2.join()

436

```

437

438

### Bidirectional Pipe Communication

439

440

```python

441

from multiprocess import Process, Pipe

442

import time

443

444

def ping_pong(conn, name, count):

445

for i in range(count):

446

if name == "ping":

447

msg = f"ping-{i}"

448

conn.send(msg)

449

print(f"Sent: {msg}")

450

response = conn.recv()

451

print(f"Received: {response}")

452

else:

453

request = conn.recv()

454

print(f"Received: {request}")

455

msg = f"pong-{i}"

456

conn.send(msg)

457

print(f"Sent: {msg}")

458

time.sleep(0.5)

459

460

# Create duplex pipe

461

conn1, conn2 = Pipe(duplex=True)

462

463

# Create processes

464

p1 = Process(target=ping_pong, args=(conn1, "ping", 3))

465

p2 = Process(target=ping_pong, args=(conn2, "pong", 3))

466

467

p1.start()

468

p2.start()

469

470

p1.join()

471

p2.join()

472

```

473

474

### Multiple Producers and Consumers

475

476

```python

477

from multiprocess import Process, Queue

478

import random

479

import time

480

481

def producer(q, producer_id, num_items):

482

for i in range(num_items):

483

item = f"Producer-{producer_id}-Item-{i}"

484

q.put(item)

485

print(f"Produced: {item}")

486

time.sleep(random.uniform(0.1, 0.5))

487

488

def consumer(q, consumer_id):

489

while True:

490

try:

491

item = q.get(timeout=2)

492

print(f"Consumer-{consumer_id} consumed: {item}")

493

time.sleep(random.uniform(0.2, 0.8))

494

except:

495

print(f"Consumer-{consumer_id} timed out, exiting")

496

break

497

498

# Create queue

499

queue = Queue()

500

501

# Create multiple producers

502

producers = []

503

for i in range(2):

504

p = Process(target=producer, args=(queue, i, 5))

505

p.start()

506

producers.append(p)

507

508

# Create multiple consumers

509

consumers = []

510

for i in range(3):

511

p = Process(target=consumer, args=(queue, i))

512

p.start()

513

consumers.append(p)

514

515

# Wait for producers to finish

516

for p in producers:

517

p.join()

518

519

# Wait for consumers to finish (they will timeout)

520

for p in consumers:

521

p.join()

522

```

523

524

### Enhanced Serialization Example

525

526

```python

527

from multiprocess import Process, Queue

528

import pickle

529

530

# Complex object that requires dill serialization

531

class ComplexObject:

532

def __init__(self, func):

533

self.func = func

534

self.data = [1, 2, 3, 4, 5]

535

536

def process(self):

537

return [self.func(x) for x in self.data]

538

539

def worker(q):

540

while True:

541

obj = q.get()

542

if obj is None:

543

break

544

result = obj.process()

545

print(f"Worker result: {result}")

546

547

# Create object with lambda function (requires dill)

548

complex_obj = ComplexObject(lambda x: x ** 2)

549

550

# Create queue and process

551

queue = Queue()

552

p = Process(target=worker, args=(queue,))

553

p.start()

554

555

# Send complex object (automatically serialized with dill)

556

queue.put(complex_obj)

557

queue.put(None) # Signal completion

558

559

p.join()

560

```

561

562

### Connection with Authentication

563

564

```python

565

from multiprocess import Process

566

from multiprocess.connection import Listener, Client

567

568

def server(address, authkey):

569

with Listener(address, authkey=authkey) as listener:

570

print(f"Server listening on {address}")

571

with listener.accept() as conn:

572

print("Connection accepted")

573

while True:

574

try:

575

msg = conn.recv()

576

print(f"Server received: {msg}")

577

if msg == "quit":

578

break

579

conn.send(f"Echo: {msg}")

580

except EOFError:

581

break

582

583

def client(address, authkey):

584

with Client(address, authkey=authkey) as conn:

585

messages = ["hello", "world", "quit"]

586

for msg in messages:

587

conn.send(msg)

588

if msg != "quit":

589

response = conn.recv()

590

print(f"Client received: {response}")

591

592

# Authentication key

593

authkey = b'secret_key'

594

address = ('localhost', 6000)

595

596

# Start server process

597

server_process = Process(target=server, args=(address, authkey))

598

server_process.start()

599

600

# Give server time to start

601

import time

602

time.sleep(0.5)

603

604

# Start client process

605

client_process = Process(target=client, args=(address, authkey))

606

client_process.start()

607

608

client_process.join()

609

server_process.join()

610

```