Backport of the concurrent.futures package from Python 3 for Python 2
—
Executor classes provide high-level interfaces for asynchronously executing callables using either threads or processes. Both executors inherit from the abstract Executor base class and provide the same interface with different underlying implementations.
Executes calls asynchronously using a pool of worker threads. Ideal for I/O-bound tasks and situations where you want to overlap I/O with computation rather than CPU-bound parallel processing.
class ThreadPoolExecutor:
def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()):
"""
Initialize ThreadPoolExecutor.
Parameters:
- max_workers (int, optional): Maximum number of threads. Default: (cpu_count() or 1) * 5
- thread_name_prefix (str, optional): Name prefix for worker threads
- initializer (callable, optional): Function called at start of each worker thread
- initargs (tuple, optional): Arguments passed to initializer function
"""
def submit(self, fn, *args, **kwargs):
"""
Submit a callable to be executed asynchronously.
Parameters:
- fn (callable): Function to execute
- *args: Positional arguments for fn
- **kwargs: Keyword arguments for fn
Returns:
Future: Future representing the execution
"""
def map(self, fn, *iterables, **kwargs):
"""
Apply function to iterables in parallel.
Parameters:
- fn (callable): Function to apply to each element
- *iterables: Iterables to process
- timeout (float, optional): Maximum time to wait for all results
Returns:
iterator: Results in same order as input
Raises:
TimeoutError: If timeout exceeded
"""
def shutdown(self, wait=True):
"""
Clean up executor resources.
Parameters:
- wait (bool): Whether to wait for pending futures to complete
"""Basic ThreadPoolExecutor Usage:
from concurrent.futures import ThreadPoolExecutor
import time
def io_task(n):
time.sleep(0.1) # Simulate I/O
return f"Task {n} completed"
# Context manager ensures proper cleanup
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit individual tasks
future1 = executor.submit(io_task, 1)
future2 = executor.submit(io_task, 2)
# Get results
print(future1.result()) # "Task 1 completed"
print(future2.result()) # "Task 2 completed"Using map() for batch processing:
with ThreadPoolExecutor(max_workers=3) as executor:
# Process multiple items in parallel
results = list(executor.map(io_task, range(5)))
print(results) # ['Task 0 completed', 'Task 1 completed', ...]Thread naming and initialization:
def init_worker():
print(f"Worker {threading.current_thread().name} starting")
with ThreadPoolExecutor(
max_workers=2,
thread_name_prefix='MyWorker',
initializer=init_worker
) as executor:
future = executor.submit(io_task, 1)
result = future.result()Executes calls asynchronously using a pool of worker processes. Best for CPU-bound tasks that can benefit from true parallelism, though it has known limitations on Python 2.
class ProcessPoolExecutor:
def __init__(self, max_workers=None):
"""
Initialize ProcessPoolExecutor.
Parameters:
- max_workers (int, optional): Maximum number of processes. Default: cpu_count()
"""
def submit(self, fn, *args, **kwargs):
"""
Submit a callable to be executed asynchronously.
Parameters:
- fn (callable): Function to execute (must be picklable)
- *args: Positional arguments for fn (must be picklable)
- **kwargs: Keyword arguments for fn (must be picklable)
Returns:
Future: Future representing the execution
"""
def map(self, fn, *iterables, **kwargs):
"""
Apply function to iterables in parallel across processes.
Parameters:
- fn (callable): Function to apply (must be picklable)
- *iterables: Iterables to process (must be picklable)
- timeout (float, optional): Maximum time to wait for all results
Returns:
iterator: Results in same order as input
Raises:
TimeoutError: If timeout exceeded
"""
def shutdown(self, wait=True):
"""
Clean up executor resources.
Parameters:
- wait (bool): Whether to wait for pending futures to complete
"""Basic ProcessPoolExecutor Usage:
from concurrent.futures import ProcessPoolExecutor
import math
def cpu_task(n):
# CPU-intensive calculation
return sum(math.sqrt(i) for i in range(n * 1000))
# Only use ProcessPoolExecutor for CPU-bound tasks
with ProcessPoolExecutor(max_workers=2) as executor:
future1 = executor.submit(cpu_task, 100)
future2 = executor.submit(cpu_task, 200)
result1 = future1.result()
result2 = future2.result()
print(f"Results: {result1}, {result2}")Important ProcessPoolExecutor Considerations:
# Functions and arguments must be picklable
def process_data(data_list):
return [x * 2 for x in data_list]
# This works - function and arguments are picklable
with ProcessPoolExecutor() as executor:
data = [1, 2, 3, 4, 5]
future = executor.submit(process_data, data)
result = future.result() # [2, 4, 6, 8, 10]Both executor classes inherit from this abstract base class:
class Executor:
def submit(self, fn, *args, **kwargs):
"""Submit callable for execution. Returns Future."""
def map(self, fn, *iterables, **kwargs):
"""Map function over iterables in parallel."""
def shutdown(self, wait=True):
"""Clean up resources."""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit with cleanup."""class BrokenExecutor(RuntimeError):
"""Raised when executor becomes non-functional after severe failure."""
class BrokenThreadPool(BrokenExecutor):
"""Raised when ThreadPoolExecutor worker thread fails during initialization."""Handling executor errors:
from concurrent.futures import ThreadPoolExecutor, BrokenThreadPool
def failing_initializer():
raise ValueError("Initialization failed")
try:
with ThreadPoolExecutor(initializer=failing_initializer) as executor:
future = executor.submit(lambda: "test")
result = future.result()
except BrokenThreadPool as e:
print(f"Thread pool broken: {e}")Shutdown after exceptions:
executor = ThreadPoolExecutor(max_workers=2)
try:
# Submit work
future = executor.submit(some_function)
result = future.result()
finally:
# Always clean up
executor.shutdown(wait=True)(cpu_count() or 1) * 5, optimized for I/O-bound taskscpu_count(), optimized for CPU-bound taskswith statements or explicit shutdown() calls for proper cleanupInstall with Tessl CLI
npx tessl i tessl/pypi-futures