or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asynchronous-decorators.mdconcurrent-decorators.mdfuture-types-exceptions.mdindex.mdprocess-pools.mdsynchronization-utilities.mdthread-pools.md

thread-pools.mddocs/

0

# Thread Pools

1

2

Managed pools of worker threads for executing multiple tasks concurrently. Thread pools are ideal for I/O-bound tasks and provide fine-grained control over worker lifecycle, task scheduling, and resource management. They offer better resource utilization and management compared to creating individual threads.

3

4

## Capabilities

5

6

### ThreadPool Class

7

8

A managed pool of worker threads that can execute multiple tasks concurrently with automatic worker lifecycle management, task queuing, and optional worker restart capabilities.

9

10

```python { .api }

11

class ThreadPool:

12

def __init__(

13

self,

14

max_workers: int = multiprocessing.cpu_count(),

15

max_tasks: int = 0,

16

initializer: Callable = None,

17

initargs: list = ()

18

):

19

"""

20

Create a thread pool for concurrent task execution.

21

22

Parameters:

23

- max_workers: Maximum number of worker threads (defaults to CPU count)

24

- max_tasks: Maximum tasks per worker before restart (0 = no limit)

25

- initializer: Function called when each worker thread starts

26

- initargs: Arguments passed to initializer function

27

"""

28

```

29

30

#### Basic Usage

31

32

```python

33

from pebble import ThreadPool

34

import time

35

36

# Create pool with default settings

37

pool = ThreadPool()

38

39

# Create pool with custom configuration

40

pool = ThreadPool(max_workers=4, max_tasks=10)

41

42

def io_task(duration, message):

43

time.sleep(duration)

44

return f"Completed: {message}"

45

46

# Schedule tasks

47

future1 = pool.schedule(io_task, args=(1, "Task 1"))

48

future2 = pool.schedule(io_task, args=(2, "Task 2"))

49

50

# Get results

51

result1 = future1.result()

52

result2 = future2.result()

53

54

print(f"Results: {result1}, {result2}")

55

56

# Always clean up

57

pool.close()

58

pool.join()

59

```

60

61

### Task Scheduling

62

63

Schedule individual tasks for execution by worker threads:

64

65

```python { .api }

66

def schedule(

67

self,

68

function: Callable,

69

args: tuple = (),

70

kwargs: dict = {}

71

) -> concurrent.futures.Future:

72

"""

73

Schedule a function for execution in the thread pool.

74

75

Parameters:

76

- function: The function to execute

77

- args: Positional arguments to pass to function

78

- kwargs: Keyword arguments to pass to function

79

80

Returns:

81

concurrent.futures.Future object for retrieving the result

82

"""

83

84

def submit(

85

self,

86

function: Callable,

87

*args,

88

**kwargs

89

) -> concurrent.futures.Future:

90

"""

91

Submit a function for execution (compatibility with concurrent.futures).

92

93

Parameters:

94

- function: The function to execute

95

- args: Positional arguments to pass to function

96

- kwargs: Keyword arguments to pass to function

97

98

Returns:

99

concurrent.futures.Future object for retrieving the result

100

"""

101

```

102

103

#### Usage Examples

104

105

```python

106

from pebble import ThreadPool

107

import requests

108

import time

109

110

def fetch_url(url, timeout=10):

111

response = requests.get(url, timeout=timeout)

112

return {"url": url, "status": response.status_code, "size": len(response.content)}

113

114

def process_data(data, multiplier=1):

115

# Simulate data processing

116

time.sleep(0.1)

117

return [x * multiplier for x in data]

118

119

# Create and use thread pool

120

with ThreadPool(max_workers=8) as pool:

121

# Schedule multiple HTTP requests

122

urls = [

123

"https://httpbin.org/delay/1",

124

"https://httpbin.org/delay/2",

125

"https://httpbin.org/json",

126

"https://httpbin.org/user-agent"

127

]

128

129

# Using schedule method

130

fetch_futures = []

131

for url in urls:

132

future = pool.schedule(fetch_url, args=(url,), kwargs={"timeout": 5})

133

fetch_futures.append(future)

134

135

# Using submit method (concurrent.futures style)

136

data_futures = []

137

datasets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

138

for dataset in datasets:

139

future = pool.submit(process_data, dataset, multiplier=2)

140

data_futures.append(future)

141

142

# Collect results

143

fetch_results = [f.result() for f in fetch_futures]

144

data_results = [f.result() for f in data_futures]

145

146

print("Fetch results:", fetch_results)

147

print("Data results:", data_results)

148

```

149

150

### Bulk Operations with Map

151

152

Execute a function across multiple inputs using the map interface:

153

154

```python { .api }

155

def map(

156

self,

157

function: Callable,

158

*iterables,

159

chunksize: int = None,

160

timeout: float = None

161

) -> MapFuture:

162

"""

163

Apply function to every item of iterables in parallel.

164

165

Parameters:

166

- function: Function to apply to each item

167

- iterables: One or more iterables to process

168

- chunksize: Number of items per chunk (None for automatic sizing)

169

- timeout: Maximum time to wait for all results

170

171

Returns:

172

MapFuture object that yields results as they become available

173

"""

174

```

175

176

#### Map Usage Examples

177

178

```python

179

from pebble import ThreadPool

180

import math

181

import time

182

183

def compute_sqrt(x):

184

time.sleep(0.01) # Simulate some work

185

return math.sqrt(x)

186

187

def fetch_user_data(user_id):

188

# Simulate API call

189

time.sleep(0.1)

190

return {"id": user_id, "name": f"User {user_id}", "active": user_id % 2 == 0}

191

192

# Using map for batch processing

193

with ThreadPool(max_workers=6) as pool:

194

# Process numbers

195

numbers = range(100)

196

sqrt_results = pool.map(compute_sqrt, numbers, chunksize=10)

197

198

# Process as results become available

199

print("Square roots:")

200

for i, result in enumerate(sqrt_results):

201

print(f"sqrt({i}) = {result:.3f}")

202

203

# Fetch user data for multiple users

204

user_ids = range(1, 21)

205

user_futures = pool.map(fetch_user_data, user_ids, timeout=30)

206

207

# Get all results at once

208

users = list(user_futures)

209

active_users = [user for user in users if user["active"]]

210

print(f"Active users: {len(active_users)}/{len(users)}")

211

```

212

213

### Worker Initialization

214

215

Initialize worker threads with shared resources or configuration:

216

217

```python

218

from pebble import ThreadPool

219

import logging

220

import threading

221

222

# Setup function for each worker thread

223

def worker_init(db_config, log_level):

224

# Configure logging for this thread

225

logging.basicConfig(level=log_level)

226

logger = logging.getLogger(f"worker-{threading.current_thread().ident}")

227

228

# Initialize database connection (stored in thread-local storage)

229

thread_local = threading.local()

230

thread_local.db_connection = create_db_connection(db_config)

231

thread_local.logger = logger

232

233

logger.info("Worker thread initialized")

234

235

def create_db_connection(config):

236

# Simulate database connection

237

return {"host": config["host"], "connected": True}

238

239

def process_record(record_id):

240

# Access thread-local resources

241

thread_local = threading.local()

242

if hasattr(thread_local, 'logger'):

243

thread_local.logger.info(f"Processing record {record_id}")

244

245

# Simulate work with database

246

time.sleep(0.1)

247

return f"Processed {record_id}"

248

249

# Create pool with worker initialization

250

db_config = {"host": "localhost", "port": 5432}

251

pool = ThreadPool(

252

max_workers=4,

253

initializer=worker_init,

254

initargs=(db_config, logging.INFO)

255

)

256

257

try:

258

# Schedule work

259

records = range(1, 11)

260

futures = [pool.schedule(process_record, args=(record_id,)) for record_id in records]

261

262

# Get results

263

results = [f.result() for f in futures]

264

print("Results:", results)

265

266

finally:

267

pool.close()

268

pool.join()

269

```

270

271

### Pool Lifecycle Management

272

273

Properly manage pool resources and handle shutdown:

274

275

```python { .api }

276

def close(self):

277

"""

278

Prevent new tasks from being submitted to the pool.

279

Currently running tasks will continue to completion.

280

"""

281

282

def stop(self):

283

"""

284

Stop the pool immediately, cancelling pending tasks.

285

Running tasks may be interrupted.

286

"""

287

288

def join(self, timeout: float = None):

289

"""

290

Wait for all worker threads to complete.

291

292

Parameters:

293

- timeout: Maximum time to wait in seconds (None for no timeout)

294

"""

295

```

296

297

#### Lifecycle Management Examples

298

299

```python

300

from pebble import ThreadPool

301

import time

302

import signal

303

import sys

304

305

def long_running_task(task_id, duration):

306

print(f"Task {task_id} starting (duration: {duration}s)")

307

time.sleep(duration)

308

print(f"Task {task_id} completed")

309

return f"Result {task_id}"

310

311

# Graceful shutdown example

312

def graceful_shutdown_demo():

313

pool = ThreadPool(max_workers=3)

314

315

try:

316

# Schedule some long-running tasks

317

futures = []

318

for i in range(5):

319

future = pool.schedule(long_running_task, args=(i, 2))

320

futures.append(future)

321

322

# Simulate running for a while

323

time.sleep(3)

324

325

print("Initiating graceful shutdown...")

326

pool.close() # No new tasks accepted

327

328

# Wait for completion with timeout

329

pool.join(timeout=10)

330

331

# Collect results from completed tasks

332

for i, future in enumerate(futures):

333

try:

334

result = future.result(timeout=0) # Don't wait

335

print(f"Task {i} result: {result}")

336

except Exception as e:

337

print(f"Task {i} failed or incomplete: {e}")

338

339

except KeyboardInterrupt:

340

print("Interrupt received, stopping pool...")

341

pool.stop() # Force stop

342

pool.join(timeout=5)

343

344

# Context manager usage (recommended)

345

def context_manager_demo():

346

with ThreadPool(max_workers=4) as pool:

347

# Pool is automatically closed and joined when exiting context

348

futures = []

349

for i in range(10):

350

future = pool.schedule(long_running_task, args=(i, 1))

351

futures.append(future)

352

353

# Get results

354

results = [f.result() for f in futures]

355

print("All tasks completed:", len(results))

356

357

print("Pool automatically cleaned up")

358

359

# Signal handling for clean shutdown

360

def signal_handler(signum, frame):

361

print(f"Signal {signum} received, shutting down...")

362

# Handle pool cleanup here

363

sys.exit(0)

364

365

signal.signal(signal.SIGINT, signal_handler)

366

signal.signal(signal.SIGTERM, signal_handler)

367

368

# Run examples

369

graceful_shutdown_demo()

370

context_manager_demo()

371

```

372

373

### Advanced Configuration

374

375

Configure pools for specific use cases and performance requirements:

376

377

```python

378

from pebble import ThreadPool

379

import time

380

381

# High-throughput pool with worker recycling

382

high_throughput_pool = ThreadPool(

383

max_workers=20, # Many workers for high concurrency

384

max_tasks=100 # Recycle workers every 100 tasks

385

)

386

387

# Resource-constrained pool

388

constrained_pool = ThreadPool(

389

max_workers=2, # Limited workers for resource constraints

390

max_tasks=0 # No worker recycling

391

)

392

393

# Specialized pool with custom initialization

394

def init_worker_with_cache():

395

import threading

396

thread_local = threading.local()

397

thread_local.cache = {}

398

thread_local.request_count = 0

399

400

def cached_operation(key, value):

401

import threading

402

thread_local = threading.local()

403

404

if not hasattr(thread_local, 'cache'):

405

thread_local.cache = {}

406

thread_local.request_count = 0

407

408

thread_local.request_count += 1

409

410

if key in thread_local.cache:

411

return thread_local.cache[key]

412

413

# Simulate expensive operation

414

time.sleep(0.1)

415

result = value * 2

416

thread_local.cache[key] = result

417

418

return result

419

420

# Use specialized pool

421

specialized_pool = ThreadPool(

422

max_workers=4,

423

initializer=init_worker_with_cache

424

)

425

426

try:

427

# Test caching behavior

428

test_data = [(f"key_{i % 5}", i) for i in range(20)] # Repeated keys

429

futures = [

430

specialized_pool.schedule(cached_operation, args=(key, value))

431

for key, value in test_data

432

]

433

434

results = [f.result() for f in futures]

435

print(f"Processed {len(results)} items with caching")

436

437

finally:

438

specialized_pool.close()

439

specialized_pool.join()

440

```