Backport of the concurrent.futures package from Python 3 for Python 2
—
Utility functions for coordinating and managing multiple Future objects. These functions provide powerful patterns for waiting on multiple asynchronous operations and processing results as they become available.
Waits for Future objects to complete based on specified conditions and returns completed and pending futures.
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
"""
Wait for futures to complete based on specified conditions.
Parameters:
- fs (iterable): Sequence of Future objects to wait for
- timeout (float, optional): Maximum time to wait in seconds
- return_when (str): Condition for when to return:
- ALL_COMPLETED: Wait for all futures to complete (default)
- FIRST_COMPLETED: Return when any future completes
- FIRST_EXCEPTION: Return when any future raises an exception
Returns:
DoneAndNotDoneFutures: Named tuple with 'done' and 'not_done' sets
Note: The 'done' set contains completed futures, 'not_done' contains pending futures
"""Basic wait usage:
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
import time
def task(n, delay):
time.sleep(delay)
return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
# Submit multiple tasks
futures_list = [
executor.submit(task, 1, 0.5),
executor.submit(task, 2, 1.0),
executor.submit(task, 3, 0.3)
]
# Wait for all to complete
done, not_done = wait(futures_list)
print(f"Completed: {len(done)}") # 3
print(f"Pending: {len(not_done)}") # 0
# Get all results
for future in done:
print(future.result())Wait with timeout:
with ThreadPoolExecutor(max_workers=2) as executor:
futures_list = [
executor.submit(task, 1, 0.5),
executor.submit(task, 2, 2.0) # Long-running task
]
# Wait maximum 1 second
done, not_done = wait(futures_list, timeout=1.0)
print(f"Completed in 1s: {len(done)}") # 1
print(f"Still running: {len(not_done)}") # 1
# Process completed futures
for future in done:
print(f"Quick result: {future.result()}")
# Wait for remaining futures
if not_done:
final_done, _ = wait(not_done)
for future in final_done:
print(f"Slow result: {future.result()}")Return when first completes:
from concurrent.futures import FIRST_COMPLETED
with ThreadPoolExecutor(max_workers=3) as executor:
futures_list = [
executor.submit(task, 1, 1.0),
executor.submit(task, 2, 0.3), # This will complete first
executor.submit(task, 3, 2.0)
]
# Return as soon as any future completes
done, not_done = wait(futures_list, return_when=FIRST_COMPLETED)
print(f"First completed: {len(done)}") # 1
print(f"Still running: {len(not_done)}") # 2
# Get the first result
first_future = next(iter(done))
print(f"First result: {first_future.result()}")Return when first exception occurs:
from concurrent.futures import FIRST_EXCEPTION
def failing_task(n):
import time
time.sleep(0.1 * n)
if n == 2:
raise ValueError(f"Task {n} failed")
return f"Task {n} succeeded"
with ThreadPoolExecutor(max_workers=3) as executor:
futures_list = [
executor.submit(failing_task, 1),
executor.submit(failing_task, 2), # Will fail
executor.submit(failing_task, 3)
]
# Return when first exception occurs
done, not_done = wait(futures_list, return_when=FIRST_EXCEPTION)
# Check results
for future in done:
try:
result = future.result()
print(f"Success: {result}")
except Exception as e:
print(f"Exception: {e}")Returns an iterator that yields Future objects as they complete, regardless of order.
def as_completed(fs, timeout=None):
"""
Return iterator over futures as they complete.
Parameters:
- fs (iterable): Sequence of Future objects to monitor
- timeout (float, optional): Maximum total time for iteration
Yields:
Future: Futures in order of completion
Raises:
TimeoutError: If entire iteration cannot complete before timeout
Note: Duplicate futures in input are yielded only once
"""Basic as_completed usage:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def timed_task(n, delay):
time.sleep(delay)
return f"Task {n} finished after {delay}s"
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit tasks with different delays
futures_dict = {
executor.submit(timed_task, 1, 0.5): 1,
executor.submit(timed_task, 2, 0.2): 2, # Fastest
executor.submit(timed_task, 3, 0.8): 3,
executor.submit(timed_task, 4, 0.1): 4 # Actually fastest
}
# Process results as they complete
for future in as_completed(futures_dict.keys()):
task_id = futures_dict[future]
try:
result = future.result()
print(f"Task {task_id}: {result}")
except Exception as e:
print(f"Task {task_id} failed: {e}")
# Output order will be: Task 4, Task 2, Task 1, Task 3as_completed with timeout:
with ThreadPoolExecutor(max_workers=3) as executor:
futures_list = [
executor.submit(timed_task, 1, 0.3),
executor.submit(timed_task, 2, 0.6),
executor.submit(timed_task, 3, 1.5) # Too slow
]
try:
# Only wait 1 second total
for future in as_completed(futures_list, timeout=1.0):
result = future.result()
print(f"Completed: {result}")
except TimeoutError:
print("Timeout exceeded - some futures may still be running")
# Check what's still pending
for future in futures_list:
if not future.done():
print(f"Still running: {future}")Progress tracking with as_completed:
import time
def download_file(file_id, size):
"""Simulate file download"""
time.sleep(size * 0.1) # Simulate download time
return f"File {file_id} ({size}MB) downloaded"
files_to_download = [
(1, 5), # file_id, size_mb
(2, 12),
(3, 3),
(4, 8),
(5, 15)
]
with ThreadPoolExecutor(max_workers=3) as executor:
# Submit all download tasks
future_to_file = {
executor.submit(download_file, file_id, size): (file_id, size)
for file_id, size in files_to_download
}
completed = 0
total = len(future_to_file)
# Show progress as downloads complete
for future in as_completed(future_to_file.keys()):
file_id, size = future_to_file[future]
completed += 1
try:
result = future.result()
print(f"[{completed}/{total}] {result}")
except Exception as e:
print(f"[{completed}/{total}] File {file_id} failed: {e}")Batch processing with as_completed:
def process_batch(batch_id, items):
"""Process a batch of items"""
time.sleep(len(items) * 0.1) # Processing time
processed = [item.upper() for item in items]
return {"batch_id": batch_id, "processed": processed}
# Split work into batches
all_items = ["apple", "banana", "cherry", "date", "elderberry",
"fig", "grape", "honeydew", "kiwi", "lemon"]
batch_size = 3
batches = [all_items[i:i+batch_size] for i in range(0, len(all_items), batch_size)]
with ThreadPoolExecutor(max_workers=2) as executor:
# Submit all batches
batch_futures = [
executor.submit(process_batch, i, batch)
for i, batch in enumerate(batches)
]
# Collect results as they complete
all_processed = []
for future in as_completed(batch_futures):
try:
result = future.result()
all_processed.extend(result["processed"])
print(f"Batch {result['batch_id']} completed")
except Exception as e:
print(f"Batch processing failed: {e}")
print(f"All processed items: {all_processed}")class DoneAndNotDoneFutures:
"""
Named tuple returned by wait() function.
Attributes:
- done (set): Set of completed Future objects
- not_done (set): Set of uncompleted Future objects
"""
done = None # set of Future objects
not_done = None # set of Future objects# Wait condition constants for use with wait()
FIRST_COMPLETED = 'FIRST_COMPLETED' # Return when any future completes
FIRST_EXCEPTION = 'FIRST_EXCEPTION' # Return when any future raises exception
ALL_COMPLETED = 'ALL_COMPLETED' # Return when all futures complete (default)Combining wait() and as_completed():
def process_with_fallback(tasks):
"""Process tasks with timeout and fallback handling"""
with ThreadPoolExecutor(max_workers=4) as executor:
futures_list = [executor.submit(task_func, task) for task in tasks]
# First, try to get some quick results
done, not_done = wait(futures_list, timeout=1.0, return_when=FIRST_COMPLETED)
# Process any quick results
quick_results = []
for future in done:
try:
quick_results.append(future.result())
except Exception as e:
print(f"Quick task failed: {e}")
# Continue processing remaining tasks as they complete
if not_done:
for future in as_completed(not_done, timeout=5.0):
try:
result = future.result()
quick_results.append(result)
except Exception as e:
print(f"Slow task failed: {e}")
return quick_resultsRace condition handling:
def first_successful_result(tasks, max_workers=3):
"""Return first successful result, cancel others"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures_list = [executor.submit(task_func, task) for task in tasks]
try:
for future in as_completed(futures_list):
try:
result = future.result()
# Got first successful result - cancel others
for f in futures_list:
if f != future:
f.cancel()
return result
except Exception:
continue # Try next future
raise RuntimeError("All tasks failed")
except TimeoutError:
# Cancel all if timeout
for future in futures_list:
future.cancel()
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-futures