0
# Pooling
1
2
Tools for managing groups of greenlets including pools with size limits, groups for coordination, and thread pools for CPU-bound operations. These provide structured concurrency management and resource control.
3
4
## Capabilities
5
6
### Greenlet Pools
7
8
Limited-size pools of greenlets for controlling concurrency.
9
10
```python { .api }
11
class Pool:
12
"""
13
Pool of greenlets with maximum size limit.
14
"""
15
16
def __init__(self, size=None, greenlet_class=None):
17
"""
18
Create a pool.
19
20
Parameters:
21
- size: int, maximum pool size (None for unlimited)
22
- greenlet_class: class, greenlet class to use
23
"""
24
25
def spawn(self, function, *args, **kwargs) -> Greenlet:
26
"""
27
Spawn function in pool, blocking if pool is full.
28
29
Parameters:
30
- function: callable to execute
31
- *args: positional arguments
32
- **kwargs: keyword arguments
33
34
Returns:
35
Greenlet object
36
"""
37
38
def map(self, func, iterable, maxsize=None):
39
"""
40
Map function over iterable using pool.
41
42
Parameters:
43
- func: function to apply
44
- iterable: sequence to process
45
- maxsize: int, buffer size for results
46
47
Returns:
48
list of results in order
49
"""
50
51
def imap(self, func, iterable, maxsize=None):
52
"""
53
Lazy map function over iterable using pool.
54
55
Parameters:
56
- func: function to apply
57
- iterable: sequence to process
58
- maxsize: int, buffer size for results
59
60
Yields:
61
Results as they become available
62
"""
63
64
def imap_unordered(self, func, iterable, maxsize=None):
65
"""
66
Lazy map returning results in completion order.
67
68
Parameters:
69
- func: function to apply
70
- iterable: sequence to process
71
- maxsize: int, buffer size for results
72
73
Yields:
74
Results as they complete (unordered)
75
"""
76
77
def apply(self, func, args=(), kwds={}):
78
"""
79
Synchronously apply function with arguments.
80
81
Parameters:
82
- func: function to call
83
- args: tuple, positional arguments
84
- kwds: dict, keyword arguments
85
86
Returns:
87
Function result
88
"""
89
90
def apply_async(self, func, args=(), kwds={}, callback=None):
91
"""
92
Asynchronously apply function with arguments.
93
94
Parameters:
95
- func: function to call
96
- args: tuple, positional arguments
97
- kwds: dict, keyword arguments
98
- callback: callable, called with result
99
100
Returns:
101
AsyncResult object
102
"""
103
104
def join(self, timeout=None, raise_error=False):
105
"""
106
Wait for all greenlets in pool to finish.
107
108
Parameters:
109
- timeout: float, maximum time to wait
110
- raise_error: bool, raise exception if any failed
111
112
Returns:
113
None
114
"""
115
116
def kill(self, exception=GreenletExit, block=True, timeout=None):
117
"""
118
Kill all greenlets in pool.
119
120
Parameters:
121
- exception: exception to raise in greenlets
122
- block: bool, wait for greenlets to die
123
- timeout: float, maximum time to wait
124
125
Returns:
126
None
127
"""
128
129
@property
130
def size(self) -> int:
131
"""Current number of greenlets in pool."""
132
133
@property
134
def free_count(self) -> int:
135
"""Number of available slots in pool."""
136
137
class PoolFull(Exception):
138
"""Exception raised when pool is full and cannot accept more greenlets."""
139
```
140
141
### Greenlet Groups
142
143
Unbound collections of greenlets for coordination.
144
145
```python { .api }
146
class Group:
147
"""
148
Collection of greenlets for coordination without size limits.
149
"""
150
151
def add(self, greenlet):
152
"""
153
Add greenlet to group.
154
155
Parameters:
156
- greenlet: Greenlet object to add
157
158
Returns:
159
None
160
"""
161
162
def discard(self, greenlet):
163
"""
164
Remove greenlet from group.
165
166
Parameters:
167
- greenlet: Greenlet object to remove
168
169
Returns:
170
None
171
"""
172
173
def spawn(self, function, *args, **kwargs) -> Greenlet:
174
"""
175
Spawn greenlet and add to group.
176
177
Parameters:
178
- function: callable to execute
179
- *args: positional arguments
180
- **kwargs: keyword arguments
181
182
Returns:
183
Greenlet object
184
"""
185
186
def map(self, func, iterable):
187
"""
188
Map function over iterable using group greenlets.
189
190
Parameters:
191
- func: function to apply
192
- iterable: sequence to process
193
194
Returns:
195
list of results in order
196
"""
197
198
def imap(self, func, iterable):
199
"""
200
Lazy map function over iterable.
201
202
Parameters:
203
- func: function to apply
204
- iterable: sequence to process
205
206
Yields:
207
Results as they become available
208
"""
209
210
def join(self, timeout=None, raise_error=False):
211
"""
212
Wait for all greenlets in group to finish.
213
214
Parameters:
215
- timeout: float, maximum time to wait
216
- raise_error: bool, raise exception if any failed
217
218
Returns:
219
None
220
"""
221
222
def kill(self, exception=GreenletExit, block=True, timeout=None):
223
"""
224
Kill all greenlets in group.
225
226
Parameters:
227
- exception: exception to raise in greenlets
228
- block: bool, wait for greenlets to die
229
- timeout: float, maximum time to wait
230
231
Returns:
232
None
233
"""
234
235
def killone(self, greenlet, exception=GreenletExit, block=True, timeout=None):
236
"""
237
Kill one greenlet in group.
238
239
Parameters:
240
- greenlet: Greenlet object to kill
241
- exception: exception to raise
242
- block: bool, wait for greenlet to die
243
- timeout: float, maximum time to wait
244
245
Returns:
246
None
247
"""
248
```
249
250
### Thread Pools
251
252
Thread pools for CPU-bound operations that need true parallelism.
253
254
```python { .api }
255
class ThreadPool:
256
"""
257
Pool of OS threads for CPU-bound operations.
258
"""
259
260
def __init__(self, maxsize, hub=None):
261
"""
262
Create thread pool.
263
264
Parameters:
265
- maxsize: int, maximum number of threads
266
- hub: Hub, hub to use for coordination
267
"""
268
269
def spawn(self, func, *args, **kwargs) -> ThreadResult:
270
"""
271
Execute function in thread pool.
272
273
Parameters:
274
- func: function to execute
275
- *args: positional arguments
276
- **kwargs: keyword arguments
277
278
Returns:
279
ThreadResult object
280
"""
281
282
def apply(self, func, args=(), kwds={}):
283
"""
284
Synchronously execute function in thread.
285
286
Parameters:
287
- func: function to execute
288
- args: tuple, positional arguments
289
- kwds: dict, keyword arguments
290
291
Returns:
292
Function result
293
"""
294
295
def apply_async(self, func, args=(), kwds={}, callback=None):
296
"""
297
Asynchronously execute function in thread.
298
299
Parameters:
300
- func: function to execute
301
- args: tuple, positional arguments
302
- kwds: dict, keyword arguments
303
- callback: callable, called with result
304
305
Returns:
306
AsyncResult object
307
"""
308
309
def kill(self):
310
"""
311
Kill all threads in pool.
312
313
Returns:
314
None
315
"""
316
317
class ThreadResult:
318
"""
319
Result object for thread pool operations.
320
"""
321
322
def get(self, timeout=None):
323
"""
324
Get the result, blocking if necessary.
325
326
Parameters:
327
- timeout: float, maximum time to wait
328
329
Returns:
330
Function result
331
"""
332
333
def ready(self) -> bool:
334
"""
335
Check if result is available.
336
337
Returns:
338
bool, True if result is ready
339
"""
340
```
341
342
## Usage Examples
343
344
### Pool for Concurrent Requests
345
346
```python
347
import gevent
348
from gevent import pool
349
from gevent import socket
350
import time
351
352
def fetch_url(url):
353
# Simulate HTTP request
354
print(f"Fetching {url}")
355
gevent.sleep(1) # Simulate network delay
356
return f"Content from {url}"
357
358
# Create pool with max 5 concurrent operations
359
request_pool = pool.Pool(5)
360
361
urls = [f"http://example.com/page{i}" for i in range(20)]
362
363
# Process URLs with pool
364
start_time = time.time()
365
results = request_pool.map(fetch_url, urls)
366
end_time = time.time()
367
368
print(f"Processed {len(urls)} URLs in {end_time - start_time:.2f} seconds")
369
print(f"First result: {results[0]}")
370
```
371
372
### Group for Related Tasks
373
374
```python
375
import gevent
376
from gevent import pool
377
378
def worker(task_id, duration):
379
print(f"Task {task_id} starting")
380
gevent.sleep(duration)
381
print(f"Task {task_id} completed")
382
return f"Result {task_id}"
383
384
# Create group for related tasks
385
task_group = pool.Group()
386
387
# Add tasks to group
388
for i in range(5):
389
task_group.spawn(worker, i, i * 0.5)
390
391
# Wait for all tasks
392
print("Waiting for all tasks to complete...")
393
task_group.join()
394
print("All tasks completed!")
395
```
396
397
### Thread Pool for CPU-Bound Work
398
399
```python
400
import gevent
401
from gevent import threadpool
402
import math
403
404
def cpu_intensive_task(n):
405
"""CPU-intensive task that benefits from real threads."""
406
result = 0
407
for i in range(n):
408
result += math.sqrt(i)
409
return result
410
411
# Create thread pool
412
thread_pool = threadpool.ThreadPool(4)
413
414
# Execute CPU-bound tasks
415
tasks = [100000, 200000, 300000, 400000]
416
results = []
417
418
for task in tasks:
419
result = thread_pool.spawn(cpu_intensive_task, task)
420
results.append(result)
421
422
# Get results
423
for i, result in enumerate(results):
424
value = result.get() # Blocks until complete
425
print(f"Task {i}: {value:.2f}")
426
427
thread_pool.kill()
428
```
429
430
### Pool with Error Handling
431
432
```python
433
import gevent
434
from gevent import pool
435
import random
436
437
def unreliable_task(task_id):
438
gevent.sleep(random.uniform(0.1, 0.5))
439
if random.random() < 0.3: # 30% chance of failure
440
raise Exception(f"Task {task_id} failed")
441
return f"Success {task_id}"
442
443
# Create pool
444
work_pool = pool.Pool(3)
445
446
# Process tasks with error handling
447
task_ids = range(10)
448
results = []
449
450
for task_id in task_ids:
451
greenlet = work_pool.spawn(unreliable_task, task_id)
452
results.append(greenlet)
453
454
# Check results
455
for i, greenlet in enumerate(results):
456
try:
457
result = greenlet.get()
458
print(f"Task {i}: {result}")
459
except Exception as e:
460
print(f"Task {i} failed: {e}")
461
```