0
# Utility Functions
1
2
Utility functions for coordinating and managing multiple Future objects. These functions provide powerful patterns for waiting on multiple asynchronous operations and processing results as they become available.
3
4
## Capabilities
5
6
### wait Function
7
8
Waits for Future objects to complete based on specified conditions and returns completed and pending futures.
9
10
```python { .api }
11
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
12
"""
13
Wait for futures to complete based on specified conditions.
14
15
Parameters:
16
- fs (iterable): Sequence of Future objects to wait for
17
- timeout (float, optional): Maximum time to wait in seconds
18
- return_when (str): Condition for when to return:
19
- ALL_COMPLETED: Wait for all futures to complete (default)
20
- FIRST_COMPLETED: Return when any future completes
21
- FIRST_EXCEPTION: Return when any future raises an exception
22
23
Returns:
24
DoneAndNotDoneFutures: Named tuple with 'done' and 'not_done' sets
25
26
Note: The 'done' set contains completed futures, 'not_done' contains pending futures
27
"""
28
```
29
30
#### Usage Examples
31
32
**Basic wait usage:**
33
34
```python
35
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
36
import time
37
38
def task(n, delay):
39
time.sleep(delay)
40
return f"Task {n} completed"
41
42
with ThreadPoolExecutor(max_workers=3) as executor:
43
# Submit multiple tasks
44
futures_list = [
45
executor.submit(task, 1, 0.5),
46
executor.submit(task, 2, 1.0),
47
executor.submit(task, 3, 0.3)
48
]
49
50
# Wait for all to complete
51
done, not_done = wait(futures_list)
52
53
print(f"Completed: {len(done)}") # 3
54
print(f"Pending: {len(not_done)}") # 0
55
56
# Get all results
57
for future in done:
58
print(future.result())
59
```
60
61
**Wait with timeout:**
62
63
```python
64
with ThreadPoolExecutor(max_workers=2) as executor:
65
futures_list = [
66
executor.submit(task, 1, 0.5),
67
executor.submit(task, 2, 2.0) # Long-running task
68
]
69
70
# Wait maximum 1 second
71
done, not_done = wait(futures_list, timeout=1.0)
72
73
print(f"Completed in 1s: {len(done)}") # 1
74
print(f"Still running: {len(not_done)}") # 1
75
76
# Process completed futures
77
for future in done:
78
print(f"Quick result: {future.result()}")
79
80
# Wait for remaining futures
81
if not_done:
82
final_done, _ = wait(not_done)
83
for future in final_done:
84
print(f"Slow result: {future.result()}")
85
```
86
87
**Return when first completes:**
88
89
```python
90
from concurrent.futures import FIRST_COMPLETED
91
92
with ThreadPoolExecutor(max_workers=3) as executor:
93
futures_list = [
94
executor.submit(task, 1, 1.0),
95
executor.submit(task, 2, 0.3), # This will complete first
96
executor.submit(task, 3, 2.0)
97
]
98
99
# Return as soon as any future completes
100
done, not_done = wait(futures_list, return_when=FIRST_COMPLETED)
101
102
print(f"First completed: {len(done)}") # 1
103
print(f"Still running: {len(not_done)}") # 2
104
105
# Get the first result
106
first_future = next(iter(done))
107
print(f"First result: {first_future.result()}")
108
```
109
110
**Return when first exception occurs:**
111
112
```python
113
from concurrent.futures import FIRST_EXCEPTION
114
115
def failing_task(n):
116
import time
117
time.sleep(0.1 * n)
118
if n == 2:
119
raise ValueError(f"Task {n} failed")
120
return f"Task {n} succeeded"
121
122
with ThreadPoolExecutor(max_workers=3) as executor:
123
futures_list = [
124
executor.submit(failing_task, 1),
125
executor.submit(failing_task, 2), # Will fail
126
executor.submit(failing_task, 3)
127
]
128
129
# Return when first exception occurs
130
done, not_done = wait(futures_list, return_when=FIRST_EXCEPTION)
131
132
# Check results
133
for future in done:
134
try:
135
result = future.result()
136
print(f"Success: {result}")
137
except Exception as e:
138
print(f"Exception: {e}")
139
```
140
141
### as_completed Function
142
143
Returns an iterator that yields Future objects as they complete, regardless of order.
144
145
```python { .api }
146
def as_completed(fs, timeout=None):
147
"""
148
Return iterator over futures as they complete.
149
150
Parameters:
151
- fs (iterable): Sequence of Future objects to monitor
152
- timeout (float, optional): Maximum total time for iteration
153
154
Yields:
155
Future: Futures in order of completion
156
157
Raises:
158
TimeoutError: If entire iteration cannot complete before timeout
159
160
Note: Duplicate futures in input are yielded only once
161
"""
162
```
163
164
#### Usage Examples
165
166
**Basic as_completed usage:**
167
168
```python
169
from concurrent.futures import ThreadPoolExecutor, as_completed
170
import time
171
172
def timed_task(n, delay):
173
time.sleep(delay)
174
return f"Task {n} finished after {delay}s"
175
176
with ThreadPoolExecutor(max_workers=4) as executor:
177
# Submit tasks with different delays
178
futures_dict = {
179
executor.submit(timed_task, 1, 0.5): 1,
180
executor.submit(timed_task, 2, 0.2): 2, # Fastest
181
executor.submit(timed_task, 3, 0.8): 3,
182
executor.submit(timed_task, 4, 0.1): 4 # Actually fastest
183
}
184
185
# Process results as they complete
186
for future in as_completed(futures_dict.keys()):
187
task_id = futures_dict[future]
188
try:
189
result = future.result()
190
print(f"Task {task_id}: {result}")
191
except Exception as e:
192
print(f"Task {task_id} failed: {e}")
193
194
# Output order will be: Task 4, Task 2, Task 1, Task 3
195
```
196
197
**as_completed with timeout:**
198
199
```python
200
with ThreadPoolExecutor(max_workers=3) as executor:
201
futures_list = [
202
executor.submit(timed_task, 1, 0.3),
203
executor.submit(timed_task, 2, 0.6),
204
executor.submit(timed_task, 3, 1.5) # Too slow
205
]
206
207
try:
208
# Only wait 1 second total
209
for future in as_completed(futures_list, timeout=1.0):
210
result = future.result()
211
print(f"Completed: {result}")
212
except TimeoutError:
213
print("Timeout exceeded - some futures may still be running")
214
215
# Check what's still pending
216
for future in futures_list:
217
if not future.done():
218
print(f"Still running: {future}")
219
```
220
221
**Progress tracking with as_completed:**
222
223
```python
224
import time
225
226
def download_file(file_id, size):
227
"""Simulate file download"""
228
time.sleep(size * 0.1) # Simulate download time
229
return f"File {file_id} ({size}MB) downloaded"
230
231
files_to_download = [
232
(1, 5), # file_id, size_mb
233
(2, 12),
234
(3, 3),
235
(4, 8),
236
(5, 15)
237
]
238
239
with ThreadPoolExecutor(max_workers=3) as executor:
240
# Submit all download tasks
241
future_to_file = {
242
executor.submit(download_file, file_id, size): (file_id, size)
243
for file_id, size in files_to_download
244
}
245
246
completed = 0
247
total = len(future_to_file)
248
249
# Show progress as downloads complete
250
for future in as_completed(future_to_file.keys()):
251
file_id, size = future_to_file[future]
252
completed += 1
253
254
try:
255
result = future.result()
256
print(f"[{completed}/{total}] {result}")
257
except Exception as e:
258
print(f"[{completed}/{total}] File {file_id} failed: {e}")
259
```
260
261
**Batch processing with as_completed:**
262
263
```python
264
def process_batch(batch_id, items):
265
"""Process a batch of items"""
266
time.sleep(len(items) * 0.1) # Processing time
267
processed = [item.upper() for item in items]
268
return {"batch_id": batch_id, "processed": processed}
269
270
# Split work into batches
271
all_items = ["apple", "banana", "cherry", "date", "elderberry",
272
"fig", "grape", "honeydew", "kiwi", "lemon"]
273
batch_size = 3
274
batches = [all_items[i:i+batch_size] for i in range(0, len(all_items), batch_size)]
275
276
with ThreadPoolExecutor(max_workers=2) as executor:
277
# Submit all batches
278
batch_futures = [
279
executor.submit(process_batch, i, batch)
280
for i, batch in enumerate(batches)
281
]
282
283
# Collect results as they complete
284
all_processed = []
285
for future in as_completed(batch_futures):
286
try:
287
result = future.result()
288
all_processed.extend(result["processed"])
289
print(f"Batch {result['batch_id']} completed")
290
except Exception as e:
291
print(f"Batch processing failed: {e}")
292
293
print(f"All processed items: {all_processed}")
294
```
295
296
### Return Types
297
298
```python { .api }
299
class DoneAndNotDoneFutures:
300
"""
301
Named tuple returned by wait() function.
302
303
Attributes:
304
- done (set): Set of completed Future objects
305
- not_done (set): Set of uncompleted Future objects
306
"""
307
done = None # set of Future objects
308
not_done = None # set of Future objects
309
```
310
311
### Constants
312
313
```python { .api }
314
# Wait condition constants for use with wait()
315
FIRST_COMPLETED = 'FIRST_COMPLETED' # Return when any future completes
316
FIRST_EXCEPTION = 'FIRST_EXCEPTION' # Return when any future raises exception
317
ALL_COMPLETED = 'ALL_COMPLETED' # Return when all futures complete (default)
318
```
319
320
### Advanced Patterns
321
322
**Combining wait() and as_completed():**
323
324
```python
325
def process_with_fallback(tasks):
326
"""Process tasks with timeout and fallback handling"""
327
with ThreadPoolExecutor(max_workers=4) as executor:
328
futures_list = [executor.submit(task_func, task) for task in tasks]
329
330
# First, try to get some quick results
331
done, not_done = wait(futures_list, timeout=1.0, return_when=FIRST_COMPLETED)
332
333
# Process any quick results
334
quick_results = []
335
for future in done:
336
try:
337
quick_results.append(future.result())
338
except Exception as e:
339
print(f"Quick task failed: {e}")
340
341
# Continue processing remaining tasks as they complete
342
if not_done:
343
for future in as_completed(not_done, timeout=5.0):
344
try:
345
result = future.result()
346
quick_results.append(result)
347
except Exception as e:
348
print(f"Slow task failed: {e}")
349
350
return quick_results
351
```
352
353
**Race condition handling:**
354
355
```python
356
def first_successful_result(tasks, max_workers=3):
357
"""Return first successful result, cancel others"""
358
with ThreadPoolExecutor(max_workers=max_workers) as executor:
359
futures_list = [executor.submit(task_func, task) for task in tasks]
360
361
try:
362
for future in as_completed(futures_list):
363
try:
364
result = future.result()
365
# Got first successful result - cancel others
366
for f in futures_list:
367
if f != future:
368
f.cancel()
369
return result
370
except Exception:
371
continue # Try next future
372
373
raise RuntimeError("All tasks failed")
374
except TimeoutError:
375
# Cancel all if timeout
376
for future in futures_list:
377
future.cancel()
378
raise
379
```