0
# Thread Pools
1
2
Managed pools of worker threads for executing multiple tasks concurrently. Thread pools are ideal for I/O-bound tasks and provide fine-grained control over worker lifecycle, task scheduling, and resource management. They offer better resource utilization and management compared to creating individual threads.
3
4
## Capabilities
5
6
### ThreadPool Class
7
8
A managed pool of worker threads that can execute multiple tasks concurrently with automatic worker lifecycle management, task queuing, and optional worker restart capabilities.
9
10
```python { .api }
11
class ThreadPool:
12
def __init__(
13
self,
14
max_workers: int = multiprocessing.cpu_count(),
15
max_tasks: int = 0,
16
initializer: Callable = None,
17
initargs: list = ()
18
):
19
"""
20
Create a thread pool for concurrent task execution.
21
22
Parameters:
23
- max_workers: Maximum number of worker threads (defaults to CPU count)
24
- max_tasks: Maximum tasks per worker before restart (0 = no limit)
25
- initializer: Function called when each worker thread starts
26
- initargs: Arguments passed to initializer function
27
"""
28
```
29
30
#### Basic Usage
31
32
```python
33
from pebble import ThreadPool
34
import time
35
36
# Create pool with default settings
37
pool = ThreadPool()
38
39
# Create pool with custom configuration
40
pool = ThreadPool(max_workers=4, max_tasks=10)
41
42
def io_task(duration, message):
43
time.sleep(duration)
44
return f"Completed: {message}"
45
46
# Schedule tasks
47
future1 = pool.schedule(io_task, args=(1, "Task 1"))
48
future2 = pool.schedule(io_task, args=(2, "Task 2"))
49
50
# Get results
51
result1 = future1.result()
52
result2 = future2.result()
53
54
print(f"Results: {result1}, {result2}")
55
56
# Always clean up
57
pool.close()
58
pool.join()
59
```
60
61
### Task Scheduling
62
63
Schedule individual tasks for execution by worker threads:
64
65
```python { .api }
66
def schedule(
67
self,
68
function: Callable,
69
args: tuple = (),
70
kwargs: dict = {}
71
) -> concurrent.futures.Future:
72
"""
73
Schedule a function for execution in the thread pool.
74
75
Parameters:
76
- function: The function to execute
77
- args: Positional arguments to pass to function
78
- kwargs: Keyword arguments to pass to function
79
80
Returns:
81
concurrent.futures.Future object for retrieving the result
82
"""
83
84
def submit(
85
self,
86
function: Callable,
87
*args,
88
**kwargs
89
) -> concurrent.futures.Future:
90
"""
91
Submit a function for execution (compatibility with concurrent.futures).
92
93
Parameters:
94
- function: The function to execute
95
- args: Positional arguments to pass to function
96
- kwargs: Keyword arguments to pass to function
97
98
Returns:
99
concurrent.futures.Future object for retrieving the result
100
"""
101
```
102
103
#### Usage Examples
104
105
```python
106
from pebble import ThreadPool
107
import requests
108
import time
109
110
def fetch_url(url, timeout=10):
111
response = requests.get(url, timeout=timeout)
112
return {"url": url, "status": response.status_code, "size": len(response.content)}
113
114
def process_data(data, multiplier=1):
115
# Simulate data processing
116
time.sleep(0.1)
117
return [x * multiplier for x in data]
118
119
# Create and use thread pool
120
with ThreadPool(max_workers=8) as pool:
121
# Schedule multiple HTTP requests
122
urls = [
123
"https://httpbin.org/delay/1",
124
"https://httpbin.org/delay/2",
125
"https://httpbin.org/json",
126
"https://httpbin.org/user-agent"
127
]
128
129
# Using schedule method
130
fetch_futures = []
131
for url in urls:
132
future = pool.schedule(fetch_url, args=(url,), kwargs={"timeout": 5})
133
fetch_futures.append(future)
134
135
# Using submit method (concurrent.futures style)
136
data_futures = []
137
datasets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
138
for dataset in datasets:
139
future = pool.submit(process_data, dataset, multiplier=2)
140
data_futures.append(future)
141
142
# Collect results
143
fetch_results = [f.result() for f in fetch_futures]
144
data_results = [f.result() for f in data_futures]
145
146
print("Fetch results:", fetch_results)
147
print("Data results:", data_results)
148
```
149
150
### Bulk Operations with Map
151
152
Execute a function across multiple inputs using the map interface:
153
154
```python { .api }
155
def map(
156
self,
157
function: Callable,
158
*iterables,
159
chunksize: int = None,
160
timeout: float = None
161
) -> MapFuture:
162
"""
163
Apply function to every item of iterables in parallel.
164
165
Parameters:
166
- function: Function to apply to each item
167
- iterables: One or more iterables to process
168
- chunksize: Number of items per chunk (None for automatic sizing)
169
- timeout: Maximum time to wait for all results
170
171
Returns:
172
MapFuture object that yields results as they become available
173
"""
174
```
175
176
#### Map Usage Examples
177
178
```python
179
from pebble import ThreadPool
180
import math
181
import time
182
183
def compute_sqrt(x):
184
time.sleep(0.01) # Simulate some work
185
return math.sqrt(x)
186
187
def fetch_user_data(user_id):
188
# Simulate API call
189
time.sleep(0.1)
190
return {"id": user_id, "name": f"User {user_id}", "active": user_id % 2 == 0}
191
192
# Using map for batch processing
193
with ThreadPool(max_workers=6) as pool:
194
# Process numbers
195
numbers = range(100)
196
sqrt_results = pool.map(compute_sqrt, numbers, chunksize=10)
197
198
# Process as results become available
199
print("Square roots:")
200
for i, result in enumerate(sqrt_results):
201
print(f"sqrt({i}) = {result:.3f}")
202
203
# Fetch user data for multiple users
204
user_ids = range(1, 21)
205
user_futures = pool.map(fetch_user_data, user_ids, timeout=30)
206
207
# Get all results at once
208
users = list(user_futures)
209
active_users = [user for user in users if user["active"]]
210
print(f"Active users: {len(active_users)}/{len(users)}")
211
```
212
213
### Worker Initialization
214
215
Initialize worker threads with shared resources or configuration:
216
217
```python
218
from pebble import ThreadPool
219
import logging
220
import threading
221
222
# Setup function for each worker thread
223
def worker_init(db_config, log_level):
224
# Configure logging for this thread
225
logging.basicConfig(level=log_level)
226
logger = logging.getLogger(f"worker-{threading.current_thread().ident}")
227
228
# Initialize database connection (stored in thread-local storage)
229
thread_local = threading.local()
230
thread_local.db_connection = create_db_connection(db_config)
231
thread_local.logger = logger
232
233
logger.info("Worker thread initialized")
234
235
def create_db_connection(config):
236
# Simulate database connection
237
return {"host": config["host"], "connected": True}
238
239
def process_record(record_id):
240
# Access thread-local resources
241
thread_local = threading.local()
242
if hasattr(thread_local, 'logger'):
243
thread_local.logger.info(f"Processing record {record_id}")
244
245
# Simulate work with database
246
time.sleep(0.1)
247
return f"Processed {record_id}"
248
249
# Create pool with worker initialization
250
db_config = {"host": "localhost", "port": 5432}
251
pool = ThreadPool(
252
max_workers=4,
253
initializer=worker_init,
254
initargs=(db_config, logging.INFO)
255
)
256
257
try:
258
# Schedule work
259
records = range(1, 11)
260
futures = [pool.schedule(process_record, args=(record_id,)) for record_id in records]
261
262
# Get results
263
results = [f.result() for f in futures]
264
print("Results:", results)
265
266
finally:
267
pool.close()
268
pool.join()
269
```
270
271
### Pool Lifecycle Management
272
273
Properly manage pool resources and handle shutdown:
274
275
```python { .api }
276
def close(self):
277
"""
278
Prevent new tasks from being submitted to the pool.
279
Currently running tasks will continue to completion.
280
"""
281
282
def stop(self):
283
"""
284
Stop the pool immediately, cancelling pending tasks.
285
Running tasks may be interrupted.
286
"""
287
288
def join(self, timeout: float = None):
289
"""
290
Wait for all worker threads to complete.
291
292
Parameters:
293
- timeout: Maximum time to wait in seconds (None for no timeout)
294
"""
295
```
296
297
#### Lifecycle Management Examples
298
299
```python
300
from pebble import ThreadPool
301
import time
302
import signal
303
import sys
304
305
def long_running_task(task_id, duration):
306
print(f"Task {task_id} starting (duration: {duration}s)")
307
time.sleep(duration)
308
print(f"Task {task_id} completed")
309
return f"Result {task_id}"
310
311
# Graceful shutdown example
312
def graceful_shutdown_demo():
313
pool = ThreadPool(max_workers=3)
314
315
try:
316
# Schedule some long-running tasks
317
futures = []
318
for i in range(5):
319
future = pool.schedule(long_running_task, args=(i, 2))
320
futures.append(future)
321
322
# Simulate running for a while
323
time.sleep(3)
324
325
print("Initiating graceful shutdown...")
326
pool.close() # No new tasks accepted
327
328
# Wait for completion with timeout
329
pool.join(timeout=10)
330
331
# Collect results from completed tasks
332
for i, future in enumerate(futures):
333
try:
334
result = future.result(timeout=0) # Don't wait
335
print(f"Task {i} result: {result}")
336
except Exception as e:
337
print(f"Task {i} failed or incomplete: {e}")
338
339
except KeyboardInterrupt:
340
print("Interrupt received, stopping pool...")
341
pool.stop() # Force stop
342
pool.join(timeout=5)
343
344
# Context manager usage (recommended)
345
def context_manager_demo():
346
with ThreadPool(max_workers=4) as pool:
347
# Pool is automatically closed and joined when exiting context
348
futures = []
349
for i in range(10):
350
future = pool.schedule(long_running_task, args=(i, 1))
351
futures.append(future)
352
353
# Get results
354
results = [f.result() for f in futures]
355
print("All tasks completed:", len(results))
356
357
print("Pool automatically cleaned up")
358
359
# Signal handling for clean shutdown
360
def signal_handler(signum, frame):
361
print(f"Signal {signum} received, shutting down...")
362
# Handle pool cleanup here
363
sys.exit(0)
364
365
signal.signal(signal.SIGINT, signal_handler)
366
signal.signal(signal.SIGTERM, signal_handler)
367
368
# Run examples
369
graceful_shutdown_demo()
370
context_manager_demo()
371
```
372
373
### Advanced Configuration
374
375
Configure pools for specific use cases and performance requirements:
376
377
```python
378
from pebble import ThreadPool
379
import time
380
381
# High-throughput pool with worker recycling
382
high_throughput_pool = ThreadPool(
383
max_workers=20, # Many workers for high concurrency
384
max_tasks=100 # Recycle workers every 100 tasks
385
)
386
387
# Resource-constrained pool
388
constrained_pool = ThreadPool(
389
max_workers=2, # Limited workers for resource constraints
390
max_tasks=0 # No worker recycling
391
)
392
393
# Specialized pool with custom initialization
394
def init_worker_with_cache():
395
import threading
396
thread_local = threading.local()
397
thread_local.cache = {}
398
thread_local.request_count = 0
399
400
def cached_operation(key, value):
401
import threading
402
thread_local = threading.local()
403
404
if not hasattr(thread_local, 'cache'):
405
thread_local.cache = {}
406
thread_local.request_count = 0
407
408
thread_local.request_count += 1
409
410
if key in thread_local.cache:
411
return thread_local.cache[key]
412
413
# Simulate expensive operation
414
time.sleep(0.1)
415
result = value * 2
416
thread_local.cache[key] = result
417
418
return result
419
420
# Use specialized pool
421
specialized_pool = ThreadPool(
422
max_workers=4,
423
initializer=init_worker_with_cache
424
)
425
426
try:
427
# Test caching behavior
428
test_data = [(f"key_{i % 5}", i) for i in range(20)] # Repeated keys
429
futures = [
430
specialized_pool.schedule(cached_operation, args=(key, value))
431
for key, value in test_data
432
]
433
434
results = [f.result() for f in futures]
435
print(f"Processed {len(results)} items with caching")
436
437
finally:
438
specialized_pool.close()
439
specialized_pool.join()
440
```