or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication.mdcontext-management.mdindex.mdmanagers.mdprocess-management.mdprocess-pools.mdqueues.mdshared-memory.mdsynchronization.md

process-pools.mddocs/

0

# Process Pools

1

2

Advanced process pool implementation for parallel execution with timeout support, worker management, and enhanced error handling. Billiard's Pool class extends Python's standard multiprocessing.Pool with additional features for production environments.

3

4

## Capabilities

5

6

### Pool Creation and Configuration

7

8

Create process pools with extensive configuration options for timeout handling, worker management, and restart policies.

9

10

```python { .api }

11

class Pool:

12

"""

13

A process pool object which controls worker processes to execute tasks.

14

"""

15

def __init__(self, processes=None, initializer=None, initargs=(),

16

maxtasksperchild=None, timeout=None, soft_timeout=None,

17

lost_worker_timeout=None, max_restarts=None, max_restart_freq=1,

18

on_process_up=None, on_process_down=None, on_timeout_set=None,

19

on_timeout_cancel=None, threads=True, semaphore=None, putlocks=False,

20

allow_restart=False, synack=False, on_process_exit=None,

21

context=None, max_memory_per_child=None, enable_timeouts=False):

22

"""

23

Create a process pool.

24

25

Parameters:

26

- processes: number of worker processes (default: cpu_count())

27

- initializer: callable to run on worker startup

28

- initargs: arguments for initializer

29

- maxtasksperchild: tasks per worker before restart

30

- timeout: hard timeout for tasks (seconds)

31

- soft_timeout: soft timeout allowing cleanup (seconds)

32

- lost_worker_timeout: timeout for detecting lost workers

33

- max_restarts: maximum worker restarts

34

- max_restart_freq: restart frequency limit

35

- on_process_up: callback when worker starts

36

- on_process_down: callback when worker stops

37

- on_timeout_set: callback when timeout is set (job, soft_timeout, hard_timeout)

38

- on_timeout_cancel: callback when timeout is cancelled (job)

39

- threads: use threads for result handling

40

- semaphore: custom semaphore for task limiting

41

- putlocks: use locks for putting tasks

42

- allow_restart: allow pool restarts

43

- synack: enable synchronous acknowledgment mode for task cancellation

44

- on_process_exit: callback when process exits (pid, exitcode)

45

- context: multiprocessing context to use (default: None)

46

- max_memory_per_child: memory limit per child process in kilobytes

47

- enable_timeouts: explicitly enable timeout handling (default: False)

48

"""

49

```

50

51

Usage example:

52

53

```python

54

from billiard import Pool

55

import time

56

import signal

57

58

def init_worker():

59

"""Initialize worker process"""

60

print(f"Worker {os.getpid()} initialized")

61

# Ignore interrupt signals in worker

62

signal.signal(signal.SIGINT, signal.SIG_IGN)

63

64

def long_task(x):

65

"""Task that might take a while"""

66

time.sleep(x * 0.1) # Simulate work

67

return x * x

68

69

def worker_up_callback(pid):

70

print(f"Worker {pid} started")

71

72

def worker_down_callback(pid, exitcode):

73

print(f"Worker {pid} stopped with exit code {exitcode}")

74

75

# Create pool with advanced configuration

76

with Pool(

77

processes=4,

78

initializer=init_worker,

79

timeout=30, # Hard timeout: 30 seconds

80

soft_timeout=25, # Soft timeout: 25 seconds

81

maxtasksperchild=100, # Restart workers after 100 tasks

82

max_restarts=5, # Allow up to 5 worker restarts

83

on_process_up=worker_up_callback,

84

on_process_down=worker_down_callback,

85

allow_restart=True

86

) as pool:

87

88

# Submit tasks

89

numbers = list(range(20))

90

results = pool.map(long_task, numbers)

91

print(f"Results: {results}")

92

```

93

94

### Synchronous Task Execution

95

96

Execute tasks synchronously with blocking calls that return results immediately.

97

98

```python { .api }

99

def apply(self, func, args=(), kwds={}):

100

"""

101

Call func with arguments args and keyword arguments kwds.

102

Blocks until result is ready.

103

104

Parameters:

105

- func: callable to execute

106

- args: positional arguments

107

- kwds: keyword arguments

108

109

Returns:

110

Result of func(*args, **kwds)

111

"""

112

113

def map(self, func, iterable, chunksize=None):

114

"""

115

Apply func to each element of iterable, collecting results in a list.

116

117

Parameters:

118

- func: callable to apply

119

- iterable: sequence of arguments

120

- chunksize: size of chunks sent to workers

121

122

Returns:

123

List of results

124

"""

125

126

def starmap(self, func, iterable, chunksize=None):

127

"""

128

Like map() but arguments are unpacked from tuples.

129

130

Parameters:

131

- func: callable to apply

132

- iterable: sequence of argument tuples

133

- chunksize: size of chunks sent to workers

134

135

Returns:

136

List of results

137

"""

138

```

139

140

Usage example:

141

142

```python

143

from billiard import Pool

144

145

def add(a, b):

146

return a + b

147

148

def multiply(args):

149

x, y = args

150

return x * y

151

152

with Pool(processes=2) as pool:

153

# Apply single function call

154

result = pool.apply(add, (5, 3))

155

print(f"5 + 3 = {result}")

156

157

# Map function over sequence

158

numbers = [1, 2, 3, 4, 5]

159

squares = pool.map(lambda x: x**2, numbers)

160

print(f"Squares: {squares}")

161

162

# Starmap with argument tuples

163

pairs = [(2, 3), (4, 5), (6, 7)]

164

products = pool.starmap(multiply, pairs)

165

print(f"Products: {products}")

166

```

167

168

### Asynchronous Task Execution

169

170

Execute tasks asynchronously with non-blocking calls that return result objects for later retrieval.

171

172

```python { .api }

173

def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None,

174

accept_callback=None, timeout_callback=None, waitforslot=None,

175

soft_timeout=None, timeout=None, lost_worker_timeout=None,

176

callbacks_propagate=None, correlation_id=None):

177

"""

178

Asynchronous version of apply() method.

179

180

Parameters:

181

- func: callable to execute

182

- args: positional arguments

183

- kwds: keyword arguments

184

- callback: callable for successful results

185

- error_callback: callable for exceptions

186

- accept_callback: callable for task acceptance

187

- timeout_callback: callable for task timeout

188

- waitforslot: wait for available slot before submitting

189

- soft_timeout: task-specific soft timeout (seconds)

190

- timeout: task-specific hard timeout (seconds)

191

- lost_worker_timeout: worker loss timeout for this task

192

- callbacks_propagate: control error propagation through callbacks

193

- correlation_id: identifier for task correlation

194

195

Returns:

196

ApplyResult object

197

"""

198

199

def map_async(self, func, iterable, chunksize=None, callback=None, error_callback=None):

200

"""

201

Asynchronous version of map() method.

202

203

Parameters:

204

- func: callable to apply

205

- iterable: sequence of arguments

206

- chunksize: size of chunks sent to workers

207

- callback: callable for successful results

208

- error_callback: callable for exceptions

209

210

Returns:

211

MapResult object

212

"""

213

214

def starmap_async(self, func, iterable, chunksize=None, callback=None, error_callback=None):

215

"""

216

Asynchronous version of starmap() method.

217

218

Parameters:

219

- func: callable to apply

220

- iterable: sequence of argument tuples

221

- chunksize: size of chunks sent to workers

222

- callback: callable for successful results

223

- error_callback: callable for exceptions

224

225

Returns:

226

MapResult object

227

"""

228

229

def imap(self, func, iterable, chunksize=1):

230

"""

231

Lazy equivalent of map() returning an iterator.

232

233

Parameters:

234

- func: callable to apply

235

- iterable: sequence of arguments

236

- chunksize: size of chunks sent to workers

237

238

Returns:

239

Iterator yielding results

240

"""

241

242

def imap_unordered(self, func, iterable, chunksize=1):

243

"""

244

Like imap() but results may be returned in arbitrary order.

245

246

Parameters:

247

- func: callable to apply

248

- iterable: sequence of arguments

249

- chunksize: size of chunks sent to workers

250

251

Returns:

252

Iterator yielding results in arbitrary order

253

"""

254

```

255

256

Usage example:

257

258

```python

259

from billiard import Pool

260

import time

261

262

def slow_task(x):

263

time.sleep(0.1)

264

return x * x

265

266

def success_callback(result):

267

print(f"Task completed with result: {result}")

268

269

def error_callback(error):

270

print(f"Task failed with error: {error}")

271

272

with Pool(processes=4) as pool:

273

# Async apply

274

result = pool.apply_async(

275

slow_task,

276

(5,),

277

callback=success_callback,

278

error_callback=error_callback

279

)

280

281

# Continue other work while task runs

282

print("Task submitted, doing other work...")

283

time.sleep(0.05)

284

285

# Get result when ready

286

value = result.get(timeout=1)

287

print(f"Got result: {value}")

288

289

# Async map

290

numbers = list(range(10))

291

map_result = pool.map_async(slow_task, numbers)

292

293

# Use iterator for streaming results

294

for i, result in enumerate(pool.imap(slow_task, range(5))):

295

print(f"Streaming result {i}: {result}")

296

```

297

298

### Result Objects

299

300

Objects returned by asynchronous operations for result retrieval and status checking.

301

302

```python { .api }

303

class ApplyResult:

304

"""

305

Result object returned by Pool.apply_async().

306

"""

307

def get(self, timeout=None):

308

"""

309

Return result when available.

310

311

Parameters:

312

- timeout: timeout in seconds (None for no timeout)

313

314

Returns:

315

Result value

316

317

Raises:

318

- TimeoutError: if timeout exceeded

319

- Exception: if task raised exception

320

"""

321

322

def wait(self, timeout=None):

323

"""

324

Wait until result is available.

325

326

Parameters:

327

- timeout: timeout in seconds (None for no timeout)

328

"""

329

330

def ready(self) -> bool:

331

"""Return whether result is ready."""

332

333

def successful(self) -> bool:

334

"""Return whether task completed successfully (only valid if ready())."""

335

336

def terminate(self, signum):

337

"""

338

Terminate the job.

339

340

Parameters:

341

- signum: signal number for termination

342

"""

343

344

class MapResult(ApplyResult):

345

"""

346

Result object returned by Pool.map_async() and related methods.

347

Extends ApplyResult with additional functionality for map operations.

348

"""

349

```

350

351

Usage example:

352

353

```python

354

from billiard import Pool, TimeoutError

355

356

def risky_task(x):

357

if x == 5:

358

raise ValueError("Five is not allowed!")

359

return x * 2

360

361

with Pool(processes=2) as pool:

362

# Submit multiple async tasks

363

results = []

364

for i in range(8):

365

result = pool.apply_async(risky_task, (i,))

366

results.append(result)

367

368

# Check results

369

for i, result in enumerate(results):

370

try:

371

if result.ready():

372

print(f"Task {i} ready: {result.successful()}")

373

value = result.get(timeout=0.1)

374

print(f"Task {i} result: {value}")

375

else:

376

print(f"Task {i} still running...")

377

result.wait(timeout=1)

378

value = result.get()

379

print(f"Task {i} completed: {value}")

380

except ValueError as e:

381

print(f"Task {i} failed: {e}")

382

except TimeoutError:

383

print(f"Task {i} timed out")

384

```

385

386

### Pool Management

387

388

Methods for controlling pool lifecycle, worker management, and resource cleanup.

389

390

```python { .api }

391

def close(self):

392

"""

393

Prevent any more tasks being submitted to pool.

394

Outstanding work will complete before workers exit.

395

"""

396

397

def terminate(self):

398

"""

399

Stop worker processes immediately without completing outstanding work.

400

"""

401

402

def join(self):

403

"""

404

Wait for worker processes to exit. Must call close() or terminate() first.

405

"""

406

407

def restart(self):

408

"""

409

Restart the pool (requires allow_restart=True).

410

"""

411

412

def grow(self, n=1):

413

"""

414

Add n worker processes to the pool.

415

416

Parameters:

417

- n: number of workers to add

418

"""

419

420

def shrink(self, n=1):

421

"""

422

Remove n worker processes from the pool.

423

424

Parameters:

425

- n: number of workers to remove

426

"""

427

428

def terminate_job(self, pid, sig=None):

429

"""

430

Terminate a specific job by process ID.

431

432

Parameters:

433

- pid: process ID of worker to terminate

434

- sig: signal to send (default: SIGTERM)

435

"""

436

437

def maintain_pool(self):

438

"""

439

Maintain the pool by replacing dead workers.

440

"""

441

442

def send_ack(self, response, job, i, fd):

443

"""

444

Send acknowledgment response for a task (used with synack mode).

445

446

Parameters:

447

- response: acknowledgment response

448

- job: job being acknowledged

449

- i: job index

450

- fd: file descriptor for communication

451

"""

452

453

def did_start_ok(self) -> bool:

454

"""

455

Check if the pool started successfully by verifying no workers have exited.

456

457

Returns:

458

True if pool started successfully, False otherwise

459

"""

460

461

def on_job_ready(self, job, i, obj, inqW_fd):

462

"""

463

Hook method called when a job becomes ready for execution.

464

465

Parameters:

466

- job: the job object

467

- i: job index

468

- obj: job object data

469

- inqW_fd: input queue write file descriptor

470

"""

471

472

def handle_result_event(self, *args):

473

"""

474

Handle result events from the result handler.

475

476

Parameters:

477

- args: event arguments

478

"""

479

480

def cpu_count(self) -> int:

481

"""

482

Return the number of CPUs with fallback logic.

483

484

Returns:

485

Number of available CPUs

486

"""

487

488

@property

489

def process_sentinels(self) -> list:

490

"""

491

Return a list of process sentinel objects for monitoring worker processes.

492

493

Returns:

494

List of sentinel objects

495

"""

496

```

497

498

Usage example:

499

500

```python

501

from billiard import Pool

502

import time

503

import signal

504

505

def long_running_task(x):

506

time.sleep(10) # Very long task

507

return x

508

509

# Create pool with restart capability

510

pool = Pool(processes=4, allow_restart=True)

511

512

try:

513

# Submit some tasks

514

results = []

515

for i in range(8):

516

result = pool.apply_async(long_running_task, (i,))

517

results.append(result)

518

519

# Let some tasks start

520

time.sleep(1)

521

522

# Dynamically manage pool size

523

print("Growing pool...")

524

pool.grow(2) # Add 2 more workers

525

526

time.sleep(2)

527

528

print("Shrinking pool...")

529

pool.shrink(1) # Remove 1 worker

530

531

# Terminate specific job if needed

532

# pool.terminate_job(worker_pid, signal.SIGTERM)

533

534

# Option 1: Graceful shutdown

535

pool.close()

536

pool.join()

537

538

except KeyboardInterrupt:

539

# Option 2: Immediate shutdown

540

print("Terminating pool...")

541

pool.terminate()

542

pool.join()

543

544

# Option 3: Restart pool

545

# pool.restart()

546

```