CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-futures

Backport of the concurrent.futures package from Python 3 for Python 2

Pending
Overview
Eval results
Files

executors.mddocs/

Thread and Process Executors

Executor classes provide high-level interfaces for asynchronously executing callables using either threads or processes. Both executors inherit from the abstract Executor base class and provide the same interface with different underlying implementations.

Capabilities

ThreadPoolExecutor

Executes calls asynchronously using a pool of worker threads. Ideal for I/O-bound tasks and situations where you want to overlap I/O with computation rather than CPU-bound parallel processing.

class ThreadPoolExecutor:
    def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()):
        """
        Initialize ThreadPoolExecutor.
        
        Parameters:
        - max_workers (int, optional): Maximum number of threads. Default: (cpu_count() or 1) * 5
        - thread_name_prefix (str, optional): Name prefix for worker threads
        - initializer (callable, optional): Function called at start of each worker thread
        - initargs (tuple, optional): Arguments passed to initializer function
        """
    
    def submit(self, fn, *args, **kwargs):
        """
        Submit a callable to be executed asynchronously.
        
        Parameters:
        - fn (callable): Function to execute
        - *args: Positional arguments for fn
        - **kwargs: Keyword arguments for fn
        
        Returns:
        Future: Future representing the execution
        """
    
    def map(self, fn, *iterables, **kwargs):
        """
        Apply function to iterables in parallel.
        
        Parameters:
        - fn (callable): Function to apply to each element
        - *iterables: Iterables to process
        - timeout (float, optional): Maximum time to wait for all results
        
        Returns:
        iterator: Results in same order as input
        
        Raises:
        TimeoutError: If timeout exceeded
        """
    
    def shutdown(self, wait=True):
        """
        Clean up executor resources.
        
        Parameters:
        - wait (bool): Whether to wait for pending futures to complete
        """

Usage Examples

Basic ThreadPoolExecutor Usage:

from concurrent.futures import ThreadPoolExecutor
import time

def io_task(n):
    time.sleep(0.1)  # Simulate I/O
    return f"Task {n} completed"

# Context manager ensures proper cleanup
with ThreadPoolExecutor(max_workers=4) as executor:
    # Submit individual tasks
    future1 = executor.submit(io_task, 1)
    future2 = executor.submit(io_task, 2)
    
    # Get results
    print(future1.result())  # "Task 1 completed"
    print(future2.result())  # "Task 2 completed"

Using map() for batch processing:

with ThreadPoolExecutor(max_workers=3) as executor:
    # Process multiple items in parallel
    results = list(executor.map(io_task, range(5)))
    print(results)  # ['Task 0 completed', 'Task 1 completed', ...]

Thread naming and initialization:

def init_worker():
    print(f"Worker {threading.current_thread().name} starting")

with ThreadPoolExecutor(
    max_workers=2, 
    thread_name_prefix='MyWorker',
    initializer=init_worker
) as executor:
    future = executor.submit(io_task, 1)
    result = future.result()

ProcessPoolExecutor

Executes calls asynchronously using a pool of worker processes. Best for CPU-bound tasks that can benefit from true parallelism, though it has known limitations on Python 2.

class ProcessPoolExecutor:
    def __init__(self, max_workers=None):
        """
        Initialize ProcessPoolExecutor.
        
        Parameters:
        - max_workers (int, optional): Maximum number of processes. Default: cpu_count()
        """
    
    def submit(self, fn, *args, **kwargs):
        """
        Submit a callable to be executed asynchronously.
        
        Parameters:
        - fn (callable): Function to execute (must be picklable)
        - *args: Positional arguments for fn (must be picklable)
        - **kwargs: Keyword arguments for fn (must be picklable)
        
        Returns:
        Future: Future representing the execution
        """
    
    def map(self, fn, *iterables, **kwargs):
        """
        Apply function to iterables in parallel across processes.
        
        Parameters:
        - fn (callable): Function to apply (must be picklable)
        - *iterables: Iterables to process (must be picklable)
        - timeout (float, optional): Maximum time to wait for all results
        
        Returns:
        iterator: Results in same order as input
        
        Raises:
        TimeoutError: If timeout exceeded
        """
    
    def shutdown(self, wait=True):
        """
        Clean up executor resources.
        
        Parameters:
        - wait (bool): Whether to wait for pending futures to complete
        """

Usage Examples

Basic ProcessPoolExecutor Usage:

from concurrent.futures import ProcessPoolExecutor
import math

def cpu_task(n):
    # CPU-intensive calculation
    return sum(math.sqrt(i) for i in range(n * 1000))

# Only use ProcessPoolExecutor for CPU-bound tasks
with ProcessPoolExecutor(max_workers=2) as executor:
    future1 = executor.submit(cpu_task, 100)
    future2 = executor.submit(cpu_task, 200)
    
    result1 = future1.result()
    result2 = future2.result()
    print(f"Results: {result1}, {result2}")

Important ProcessPoolExecutor Considerations:

# Functions and arguments must be picklable
def process_data(data_list):
    return [x * 2 for x in data_list]

# This works - function and arguments are picklable
with ProcessPoolExecutor() as executor:
    data = [1, 2, 3, 4, 5]
    future = executor.submit(process_data, data)
    result = future.result()  # [2, 4, 6, 8, 10]

Executor Base Class

Both executor classes inherit from this abstract base class:

class Executor:
    def submit(self, fn, *args, **kwargs):
        """Submit callable for execution. Returns Future."""
    
    def map(self, fn, *iterables, **kwargs):
        """Map function over iterables in parallel."""
    
    def shutdown(self, wait=True):
        """Clean up resources."""
    
    def __enter__(self):
        """Context manager entry."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit with cleanup."""

Error Handling

BrokenExecutor Exceptions

class BrokenExecutor(RuntimeError):
    """Raised when executor becomes non-functional after severe failure."""

class BrokenThreadPool(BrokenExecutor):
    """Raised when ThreadPoolExecutor worker thread fails during initialization."""

Common Error Patterns

Handling executor errors:

from concurrent.futures import ThreadPoolExecutor, BrokenThreadPool

def failing_initializer():
    raise ValueError("Initialization failed")

try:
    with ThreadPoolExecutor(initializer=failing_initializer) as executor:
        future = executor.submit(lambda: "test")
        result = future.result()
except BrokenThreadPool as e:
    print(f"Thread pool broken: {e}")

Shutdown after exceptions:

executor = ThreadPoolExecutor(max_workers=2)
try:
    # Submit work
    future = executor.submit(some_function)
    result = future.result()
finally:
    # Always clean up
    executor.shutdown(wait=True)

Performance Considerations

  • ThreadPoolExecutor: Default worker count is (cpu_count() or 1) * 5, optimized for I/O-bound tasks
  • ProcessPoolExecutor: Default worker count is cpu_count(), optimized for CPU-bound tasks
  • Context Managers: Always use with statements or explicit shutdown() calls for proper cleanup
  • Task Granularity: Balance task size - too small increases overhead, too large reduces parallelism
  • Pickling Overhead: ProcessPoolExecutor requires pickling arguments and results, adding overhead

Install with Tessl CLI

npx tessl i tessl/pypi-futures

docs

executors.md

futures.md

index.md

utilities.md

tile.json