or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication.mdcontext-config.mdindex.mdpools.mdprocess-management.mdshared-objects.mdsynchronization.md

pools.mddocs/

0

# Process Pools

1

2

Parallel task execution using worker process pools. The Pool class provides a convenient way to distribute tasks across multiple processes with various execution patterns and result handling options.

3

4

## Capabilities

5

6

### Pool Class

7

8

Main class for managing a pool of worker processes for parallel task execution.

9

10

```python { .api }

11

class Pool:

12

"""

13

A pool of worker processes for parallel task execution.

14

15

Args:

16

processes: number of worker processes (default: cpu_count())

17

initializer: callable to run on each worker process startup

18

initargs: arguments for the initializer function

19

maxtasksperchild: maximum tasks per worker before restart (default: None)

20

context: multiprocess context to use for creating processes

21

"""

22

def __init__(self, processes=None, initializer=None, initargs=(),

23

maxtasksperchild=None, context=None): ...

24

25

def map(self, func, iterable, chunksize=None):

26

"""

27

Apply function to every item of iterable and return a list of results.

28

29

Args:

30

func: function to apply to each item

31

iterable: items to process

32

chunksize: items per task sent to worker processes

33

34

Returns:

35

list: results in same order as input

36

"""

37

38

def map_async(self, func, iterable, chunksize=None, callback=None,

39

error_callback=None):

40

"""

41

Asynchronous version of map() method.

42

43

Args:

44

func: function to apply to each item

45

iterable: items to process

46

chunksize: items per task sent to worker processes

47

callback: function to call with results when complete

48

error_callback: function to call if an error occurs

49

50

Returns:

51

AsyncResult: result object for async operation

52

"""

53

54

def imap(self, func, iterable, chunksize=1):

55

"""

56

Lazy version of map() that returns an iterator.

57

58

Args:

59

func: function to apply to each item

60

iterable: items to process

61

chunksize: items per task sent to worker processes

62

63

Returns:

64

iterator: iterator over results

65

"""

66

67

def imap_unordered(self, func, iterable, chunksize=1):

68

"""

69

Like imap() but results can be returned in any order.

70

71

Args:

72

func: function to apply to each item

73

iterable: items to process

74

chunksize: items per task sent to worker processes

75

76

Returns:

77

iterator: iterator over results in arbitrary order

78

"""

79

80

def starmap(self, func, iterable, chunksize=None):

81

"""

82

Like map() but arguments are unpacked from tuples.

83

84

Args:

85

func: function to apply (called with *args from each tuple)

86

iterable: sequence of tuples containing arguments

87

chunksize: items per task sent to worker processes

88

89

Returns:

90

list: results in same order as input

91

"""

92

93

def starmap_async(self, func, iterable, chunksize=None, callback=None,

94

error_callback=None):

95

"""

96

Asynchronous version of starmap() method.

97

98

Args:

99

func: function to apply (called with *args from each tuple)

100

iterable: sequence of tuples containing arguments

101

chunksize: items per task sent to worker processes

102

callback: function to call with results when complete

103

error_callback: function to call if an error occurs

104

105

Returns:

106

AsyncResult: result object for async operation

107

"""

108

109

def apply(self, func, args=(), kwds={}):

110

"""

111

Apply function with arguments and return the result.

112

113

Args:

114

func: function to call

115

args: positional arguments for func

116

kwds: keyword arguments for func

117

118

Returns:

119

object: result of function call

120

"""

121

122

def apply_async(self, func, args=(), kwds={}, callback=None,

123

error_callback=None):

124

"""

125

Asynchronous version of apply() method.

126

127

Args:

128

func: function to call

129

args: positional arguments for func

130

kwds: keyword arguments for func

131

callback: function to call with result when complete

132

error_callback: function to call if an error occurs

133

134

Returns:

135

AsyncResult: result object for async operation

136

"""

137

138

def close(self):

139

"""

140

Prevent any more tasks from being submitted to the pool.

141

Once closed, no new tasks can be submitted.

142

"""

143

144

def terminate(self):

145

"""

146

Stop the worker processes immediately without completing work.

147

"""

148

149

def join(self):

150

"""

151

Wait for the worker processes to exit.

152

Must call close() or terminate() before using join().

153

"""

154

155

def __enter__(self):

156

"""Context manager entry."""

157

158

def __exit__(self, exc_type, exc_val, exc_tb):

159

"""Context manager exit - closes pool and joins workers."""

160

```

161

162

### AsyncResult Class

163

164

Object representing the result of an asynchronous operation.

165

166

```python { .api }

167

class AsyncResult:

168

"""

169

Result object for asynchronous pool operations.

170

"""

171

def get(self, timeout=None):

172

"""

173

Return the result when it arrives.

174

175

Args:

176

timeout: maximum time to wait (seconds)

177

178

Returns:

179

object: result of the operation

180

181

Raises:

182

TimeoutError: if timeout exceeded

183

"""

184

185

def wait(self, timeout=None):

186

"""

187

Wait until the result is available.

188

189

Args:

190

timeout: maximum time to wait (seconds)

191

192

Returns:

193

bool: True if result is available, False if timeout

194

"""

195

196

def ready(self):

197

"""

198

Return True if the operation is complete.

199

200

Returns:

201

bool: True if operation is complete

202

"""

203

204

def successful(self):

205

"""

206

Return True if the operation completed without error.

207

Must call ready() first to ensure operation is complete.

208

209

Returns:

210

bool: True if successful

211

212

Raises:

213

ValueError: if operation is not yet complete

214

"""

215

```

216

217

## Usage Examples

218

219

### Basic Pool Map

220

221

```python

222

from multiprocess import Pool

223

224

def square(x):

225

return x * x

226

227

if __name__ == '__main__':

228

with Pool(processes=4) as pool:

229

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

230

results = pool.map(square, numbers)

231

print(f"Results: {results}")

232

# Output: Results: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

233

```

234

235

### Asynchronous Processing

236

237

```python

238

from multiprocess import Pool

239

import time

240

241

def slow_function(x):

242

time.sleep(1) # Simulate slow work

243

return x * x

244

245

def result_callback(result):

246

print(f"Got result: {result}")

247

248

def error_callback(error):

249

print(f"Got error: {error}")

250

251

if __name__ == '__main__':

252

with Pool(processes=2) as pool:

253

# Submit async job

254

async_result = pool.map_async(

255

slow_function,

256

[1, 2, 3, 4],

257

callback=result_callback,

258

error_callback=error_callback

259

)

260

261

# Do other work while waiting

262

print("Doing other work...")

263

time.sleep(0.5)

264

print("Still working...")

265

266

# Get results (blocks until complete)

267

results = async_result.get(timeout=10)

268

print(f"Final results: {results}")

269

```

270

271

### Starmap for Multiple Arguments

272

273

```python

274

from multiprocess import Pool

275

276

def multiply(x, y):

277

return x * y

278

279

def power(base, exponent):

280

return base ** exponent

281

282

if __name__ == '__main__':

283

with Pool(processes=3) as pool:

284

# Each tuple contains arguments for the function

285

multiply_args = [(2, 3), (4, 5), (6, 7)]

286

multiply_results = pool.starmap(multiply, multiply_args)

287

print(f"Multiply results: {multiply_results}")

288

# Output: Multiply results: [6, 20, 42]

289

290

power_args = [(2, 3), (3, 2), (4, 2), (5, 2)]

291

power_results = pool.starmap(power, power_args)

292

print(f"Power results: {power_results}")

293

# Output: Power results: [8, 9, 16, 25]

294

```

295

296

### Iterator-based Processing

297

298

```python

299

from multiprocess import Pool

300

import time

301

302

def process_item(x):

303

# Simulate variable processing time

304

time.sleep(x * 0.1)

305

return x * x

306

307

if __name__ == '__main__':

308

with Pool(processes=2) as pool:

309

items = range(1, 11)

310

311

# Ordered iterator (results in input order)

312

print("Ordered results:")

313

for result in pool.imap(process_item, items, chunksize=2):

314

print(f"Got result: {result}")

315

316

print("\nUnordered results:")

317

# Unordered iterator (results as they complete)

318

for result in pool.imap_unordered(process_item, items, chunksize=2):

319

print(f"Got result: {result}")

320

```

321

322

### Pool with Initializer

323

324

```python

325

from multiprocess import Pool

326

import os

327

328

# Global variable in worker processes

329

worker_state = None

330

331

def init_worker(initial_value):

332

global worker_state

333

worker_state = initial_value

334

print(f"Worker {os.getpid()} initialized with {initial_value}")

335

336

def worker_task(x):

337

global worker_state

338

pid = os.getpid()

339

result = x + worker_state

340

print(f"Worker {pid} processed {x} with state {worker_state} = {result}")

341

return result

342

343

if __name__ == '__main__':

344

# Each worker will be initialized with value 100

345

with Pool(processes=2, initializer=init_worker, initargs=(100,)) as pool:

346

tasks = [1, 2, 3, 4, 5]

347

results = pool.map(worker_task, tasks)

348

print(f"Results: {results}")

349

```

350

351

### Error Handling

352

353

```python

354

from multiprocess import Pool

355

import random

356

357

def unreliable_function(x):

358

if random.random() < 0.3: # 30% chance of error

359

raise ValueError(f"Error processing {x}")

360

return x * x

361

362

def handle_result(result):

363

print(f"Success: {result}")

364

365

def handle_error(error):

366

print(f"Error occurred: {error}")

367

368

if __name__ == '__main__':

369

with Pool(processes=2) as pool:

370

# Submit multiple async tasks

371

async_results = []

372

for i in range(10):

373

result = pool.apply_async(

374

unreliable_function,

375

(i,),

376

callback=handle_result,

377

error_callback=handle_error

378

)

379

async_results.append(result)

380

381

# Wait for all tasks and handle individual results

382

for i, async_result in enumerate(async_results):

383

try:

384

result = async_result.get(timeout=5)

385

print(f"Task {i} completed successfully: {result}")

386

except Exception as e:

387

print(f"Task {i} failed: {e}")

388

```

389

390

### Pool with Context Manager and Resource Cleanup

391

392

```python

393

from multiprocess import Pool

394

import time

395

import os

396

397

def cpu_intensive_task(n):

398

"""Simulate CPU-intensive work"""

399

pid = os.getpid()

400

start_time = time.time()

401

402

# Simulate computation

403

total = 0

404

for i in range(n * 1000000):

405

total += i * i

406

407

end_time = time.time()

408

duration = end_time - start_time

409

410

return {

411

'pid': pid,

412

'input': n,

413

'result': total,

414

'duration': duration

415

}

416

417

if __name__ == '__main__':

418

tasks = [10, 20, 30, 40, 50]

419

420

# Using context manager ensures proper cleanup

421

with Pool(processes=3) as pool:

422

print("Starting parallel processing...")

423

start_time = time.time()

424

425

# Process tasks in parallel

426

results = pool.map(cpu_intensive_task, tasks)

427

428

end_time = time.time()

429

total_time = end_time - start_time

430

431

print(f"\nAll tasks completed in {total_time:.2f} seconds")

432

print("\nResults:")

433

for result in results:

434

print(f"PID {result['pid']}: input={result['input']}, "

435

f"duration={result['duration']:.3f}s")

436

```

437

438

### Chunking for Performance

439

440

```python

441

from multiprocess import Pool

442

import time

443

444

def simple_task(x):

445

return x * x

446

447

def benchmark_chunking(items, pool_size, chunk_sizes):

448

"""Benchmark different chunk sizes"""

449

for chunk_size in chunk_sizes:

450

with Pool(processes=pool_size) as pool:

451

start_time = time.time()

452

results = pool.map(simple_task, items, chunksize=chunk_size)

453

end_time = time.time()

454

455

duration = end_time - start_time

456

print(f"Chunk size {chunk_size}: {duration:.3f} seconds")

457

458

if __name__ == '__main__':

459

# Large dataset

460

items = list(range(10000))

461

pool_size = 4

462

463

# Test different chunk sizes

464

chunk_sizes = [1, 10, 50, 100, 500, 1000]

465

466

print("Benchmarking chunk sizes:")

467

benchmark_chunking(items, pool_size, chunk_sizes)

468

```

469

470

### Advanced: Custom Result Processing

471

472

```python

473

from multiprocess import Pool

474

import json

475

import time

476

477

def fetch_and_process_data(item_id):

478

"""Simulate fetching and processing data"""

479

# Simulate network delay

480

time.sleep(0.1)

481

482

# Simulate data processing

483

data = {

484

'id': item_id,

485

'value': item_id * 10,

486

'processed_at': time.time(),

487

'status': 'completed'

488

}

489

490

return data

491

492

def save_result(result):

493

"""Callback to save each result as it completes"""

494

with open(f"result_{result['id']}.json", 'w') as f:

495

json.dump(result, f)

496

print(f"Saved result for item {result['id']}")

497

498

if __name__ == '__main__':

499

item_ids = list(range(1, 21)) # Process 20 items

500

501

with Pool(processes=4) as pool:

502

# Submit all tasks asynchronously with callback

503

async_results = []

504

for item_id in item_ids:

505

result = pool.apply_async(

506

fetch_and_process_data,

507

(item_id,),

508

callback=save_result

509

)

510

async_results.append(result)

511

512

# Monitor progress

513

completed = 0

514

while completed < len(async_results):

515

ready_count = sum(1 for r in async_results if r.ready())

516

if ready_count > completed:

517

completed = ready_count

518

print(f"Progress: {completed}/{len(async_results)} tasks completed")

519

time.sleep(0.5)

520

521

print("All tasks completed!")

522

```