0
# Thread Pools
1
2
Managed pools of green threads for controlling concurrency levels and resource usage in high-throughput applications. Thread pools help prevent resource exhaustion while maximizing performance.
3
4
## Capabilities
5
6
### Green Thread Pool
7
8
A pool that manages a fixed number of green threads, providing controlled concurrency and resource management.
9
10
```python { .api }
11
class GreenPool:
12
"""
13
A pool of green threads with a maximum size constraint.
14
Manages concurrency levels and provides convenient methods for
15
spawning work across the pool.
16
"""
17
18
def __init__(self, size=1000):
19
"""
20
Create a green thread pool.
21
22
Parameters:
23
- size: int, maximum number of concurrent greenthreads
24
"""
25
26
def spawn(self, func, *args, **kwargs):
27
"""
28
Spawn a function in the pool, blocking if pool is full.
29
30
Parameters:
31
- func: callable to run in pool
32
- *args: positional arguments for func
33
- **kwargs: keyword arguments for func
34
35
Returns:
36
GreenThread instance for retrieving results
37
"""
38
39
def spawn_n(self, func, *args, **kwargs):
40
"""
41
Spawn a function in the pool without return value access.
42
43
Parameters:
44
- func: callable to run in pool
45
- *args: positional arguments for func
46
- **kwargs: keyword arguments for func
47
48
Returns:
49
None
50
"""
51
52
def waitall(self):
53
"""
54
Wait for all spawned greenthreads in the pool to complete.
55
56
Returns:
57
None
58
"""
59
60
def resize(self, new_size):
61
"""
62
Change the maximum size of the pool.
63
64
Parameters:
65
- new_size: int, new maximum pool size
66
67
Returns:
68
None
69
"""
70
71
def running(self):
72
"""
73
Get the number of currently running greenthreads.
74
75
Returns:
76
int: number of active greenthreads
77
"""
78
79
def free(self):
80
"""
81
Get the number of available slots in the pool.
82
83
Returns:
84
int: number of free slots (size - running)
85
"""
86
87
def starmap(self, func, iterable):
88
"""
89
Apply func to each item in iterable, spawning across the pool.
90
Similar to itertools.starmap but with green thread pool execution.
91
92
Parameters:
93
- func: callable to apply to each item
94
- iterable: iterable of argument tuples for func
95
96
Returns:
97
iterator of results
98
"""
99
100
def imap(self, func, *iterables):
101
"""
102
Apply func to items from iterables in parallel across the pool.
103
Similar to itertools.imap but with green thread pool execution.
104
105
Parameters:
106
- func: callable to apply to items
107
- *iterables: one or more iterables
108
109
Returns:
110
iterator of results in order
111
"""
112
```
113
114
### Green Pile
115
116
A data structure for managing I/O-related tasks that provides easy iteration over results as they become available.
117
118
```python { .api }
119
class GreenPile:
120
"""
121
A pile of green threads for I/O operations.
122
Results can be retrieved as they become available through iteration.
123
"""
124
125
def __init__(self, size_or_pool=1000):
126
"""
127
Create a green pile.
128
129
Parameters:
130
- size_or_pool: int (pool size) or GreenPool instance
131
"""
132
133
def spawn(self, func, *args, **kwargs):
134
"""
135
Spawn a function and add its result to the pile.
136
137
Parameters:
138
- func: callable to run
139
- *args: positional arguments for func
140
- **kwargs: keyword arguments for func
141
142
Returns:
143
GreenThread instance
144
"""
145
146
def __iter__(self):
147
"""
148
Iterate over results as they become available.
149
150
Returns:
151
iterator yielding results in completion order
152
"""
153
154
def next(self):
155
"""
156
Get the next available result.
157
158
Returns:
159
Next completed result
160
161
Raises:
162
StopIteration: when no more results available
163
"""
164
165
def __len__(self):
166
"""
167
Get the number of spawned tasks.
168
169
Returns:
170
int: number of tasks in the pile
171
"""
172
```
173
174
## Usage Examples
175
176
### Basic Pool Usage
177
178
```python
179
import eventlet
180
181
def worker(task_id, duration):
182
"""Simulate some work"""
183
print(f"Task {task_id} starting")
184
eventlet.sleep(duration)
185
print(f"Task {task_id} completed")
186
return f"Result {task_id}"
187
188
# Create a pool with max 5 concurrent greenthreads
189
pool = eventlet.GreenPool(5)
190
191
# Spawn multiple tasks
192
results = []
193
for i in range(10):
194
gt = pool.spawn(worker, i, 1.0)
195
results.append(gt)
196
197
# Wait for all tasks to complete
198
pool.waitall()
199
200
# Collect results
201
final_results = [gt.wait() for gt in results]
202
print(f"All results: {final_results}")
203
```
204
205
### Pool with Map Operations
206
207
```python
208
import eventlet
209
210
def fetch_url(url):
211
"""Simulate fetching a URL"""
212
# In real usage, would use eventlet.green.urllib or similar
213
eventlet.sleep(0.5) # Simulate network delay
214
return f"Content from {url}"
215
216
urls = [
217
"http://example.com/page1",
218
"http://example.com/page2",
219
"http://example.com/page3",
220
"http://example.com/page4",
221
"http://example.com/page5"
222
]
223
224
pool = eventlet.GreenPool(3) # Max 3 concurrent requests
225
226
# Use imap to process URLs and get results in order
227
for result in pool.imap(fetch_url, urls):
228
print(f"Got: {result}")
229
230
# Use starmap with argument tuples
231
def fetch_with_headers(url, headers):
232
eventlet.sleep(0.3)
233
return f"Content from {url} with headers {headers}"
234
235
url_header_pairs = [
236
("http://api.example.com/users", {"Authorization": "Bearer token1"}),
237
("http://api.example.com/posts", {"Authorization": "Bearer token2"}),
238
("http://api.example.com/comments", {"Authorization": "Bearer token3"})
239
]
240
241
for result in pool.starmap(fetch_with_headers, url_header_pairs):
242
print(f"API result: {result}")
243
```
244
245
### Green Pile Usage
246
247
```python
248
import eventlet
249
250
def fetch_data(source):
251
"""Simulate fetching data from different sources"""
252
import random
253
delay = random.uniform(0.1, 2.0) # Random delay
254
eventlet.sleep(delay)
255
return f"Data from {source} (took {delay:.2f}s)"
256
257
# Create a pile for managing multiple I/O operations
258
pile = eventlet.GreenPile(10)
259
260
# Spawn multiple data fetches
261
sources = ["database", "api", "cache", "file", "network"]
262
for source in sources:
263
pile.spawn(fetch_data, source)
264
265
# Process results as they become available (not in original order)
266
print("Processing results as they arrive:")
267
for result in pile:
268
print(f"Received: {result}")
269
270
print("All operations completed")
271
```
272
273
### Dynamic Pool Resizing
274
275
```python
276
import eventlet
277
278
def monitoring_task():
279
"""Simulate a monitoring task"""
280
eventlet.sleep(1.0)
281
return "Monitor check complete"
282
283
pool = eventlet.GreenPool(2) # Start with small pool
284
285
# Spawn some initial tasks
286
for i in range(3):
287
pool.spawn(monitoring_task)
288
289
print(f"Pool stats - Running: {pool.running()}, Free: {pool.free()}")
290
291
# Increase pool size based on demand
292
if pool.free() == 0:
293
print("Pool full, increasing size")
294
pool.resize(5)
295
print(f"After resize - Running: {pool.running()}, Free: {pool.free()}")
296
297
# Spawn more tasks with increased capacity
298
for i in range(3, 7):
299
pool.spawn(monitoring_task)
300
301
# Wait for completion
302
pool.waitall()
303
print("All monitoring tasks completed")
304
```
305
306
### Producer-Consumer with Pools
307
308
```python
309
import eventlet
310
311
def producer(pool, queue, num_items):
312
"""Produce items using a pool of workers"""
313
def create_item(item_id):
314
eventlet.sleep(0.1) # Simulate work
315
return f"item_{item_id}"
316
317
# Use pool to create items concurrently
318
for i in range(num_items):
319
gt = pool.spawn(create_item, i)
320
# Put the greenthread in queue so consumer can wait for result
321
queue.put(gt)
322
323
def consumer(queue, num_items):
324
"""Consume items as they're produced"""
325
for _ in range(num_items):
326
gt = queue.get() # Get the greenthread
327
item = gt.wait() # Wait for the actual result
328
print(f"Consumed: {item}")
329
330
# Set up producer-consumer with pool
331
production_pool = eventlet.GreenPool(5)
332
item_queue = eventlet.Queue()
333
334
# Start producer and consumer
335
eventlet.spawn(producer, production_pool, item_queue, 10)
336
eventlet.spawn(consumer, item_queue, 10)
337
338
# Let them run
339
eventlet.sleep(5)
340
```
341
342
## Pool Management Best Practices
343
344
### Choosing Pool Size
345
346
```python
347
import eventlet
348
349
# For I/O-bound tasks (network, disk)
350
io_pool = eventlet.GreenPool(100) # Higher concurrency
351
352
# For CPU-bound tasks or resource-limited operations
353
cpu_pool = eventlet.GreenPool(10) # Lower concurrency
354
355
# For database connections (limited by DB connection pool)
356
db_pool = eventlet.GreenPool(20) # Match DB pool size
357
```
358
359
### Error Handling in Pools
360
361
```python
362
import eventlet
363
364
def risky_task(task_id):
365
"""Task that might fail"""
366
if task_id % 3 == 0:
367
raise ValueError(f"Task {task_id} failed!")
368
return f"Success {task_id}"
369
370
pool = eventlet.GreenPool(5)
371
greenthreads = []
372
373
# Spawn tasks
374
for i in range(10):
375
gt = pool.spawn(risky_task, i)
376
greenthreads.append(gt)
377
378
# Collect results with error handling
379
results = []
380
for gt in greenthreads:
381
try:
382
result = gt.wait()
383
results.append(result)
384
except Exception as e:
385
print(f"Task failed: {e}")
386
results.append(None)
387
388
print(f"Successful results: {[r for r in results if r is not None]}")
389
```