CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-futures

Backport of the concurrent.futures package from Python 3 for Python 2

Pending
Overview
Eval results
Files

utilities.mddocs/

Utility Functions

Utility functions for coordinating and managing multiple Future objects. These functions provide powerful patterns for waiting on multiple asynchronous operations and processing results as they become available.

Capabilities

wait Function

Waits for Future objects to complete based on specified conditions and returns completed and pending futures.

def wait(fs, timeout=None, return_when=ALL_COMPLETED):
    """
    Wait for futures to complete based on specified conditions.
    
    Parameters:
    - fs (iterable): Sequence of Future objects to wait for
    - timeout (float, optional): Maximum time to wait in seconds
    - return_when (str): Condition for when to return:
        - ALL_COMPLETED: Wait for all futures to complete (default)
        - FIRST_COMPLETED: Return when any future completes
        - FIRST_EXCEPTION: Return when any future raises an exception
    
    Returns:
    DoneAndNotDoneFutures: Named tuple with 'done' and 'not_done' sets
    
    Note: The 'done' set contains completed futures, 'not_done' contains pending futures
    """

Usage Examples

Basic wait usage:

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
import time

def task(n, delay):
    time.sleep(delay)
    return f"Task {n} completed"

with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit multiple tasks
    futures_list = [
        executor.submit(task, 1, 0.5),
        executor.submit(task, 2, 1.0),  
        executor.submit(task, 3, 0.3)
    ]
    
    # Wait for all to complete
    done, not_done = wait(futures_list)
    
    print(f"Completed: {len(done)}")     # 3
    print(f"Pending: {len(not_done)}")   # 0
    
    # Get all results
    for future in done:
        print(future.result())

Wait with timeout:

with ThreadPoolExecutor(max_workers=2) as executor:
    futures_list = [
        executor.submit(task, 1, 0.5),
        executor.submit(task, 2, 2.0)   # Long-running task
    ]
    
    # Wait maximum 1 second
    done, not_done = wait(futures_list, timeout=1.0)
    
    print(f"Completed in 1s: {len(done)}")      # 1
    print(f"Still running: {len(not_done)}")    # 1
    
    # Process completed futures
    for future in done:
        print(f"Quick result: {future.result()}")
    
    # Wait for remaining futures  
    if not_done:
        final_done, _ = wait(not_done)
        for future in final_done:
            print(f"Slow result: {future.result()}")

Return when first completes:

from concurrent.futures import FIRST_COMPLETED

with ThreadPoolExecutor(max_workers=3) as executor:
    futures_list = [
        executor.submit(task, 1, 1.0),
        executor.submit(task, 2, 0.3),  # This will complete first
        executor.submit(task, 3, 2.0)
    ]
    
    # Return as soon as any future completes
    done, not_done = wait(futures_list, return_when=FIRST_COMPLETED)
    
    print(f"First completed: {len(done)}")    # 1
    print(f"Still running: {len(not_done)}")  # 2
    
    # Get the first result
    first_future = next(iter(done))
    print(f"First result: {first_future.result()}")

Return when first exception occurs:

from concurrent.futures import FIRST_EXCEPTION

def failing_task(n):
    import time
    time.sleep(0.1 * n)
    if n == 2:
        raise ValueError(f"Task {n} failed")
    return f"Task {n} succeeded"

with ThreadPoolExecutor(max_workers=3) as executor:
    futures_list = [
        executor.submit(failing_task, 1),
        executor.submit(failing_task, 2),  # Will fail
        executor.submit(failing_task, 3)
    ]
    
    # Return when first exception occurs
    done, not_done = wait(futures_list, return_when=FIRST_EXCEPTION)
    
    # Check results
    for future in done:
        try:
            result = future.result()
            print(f"Success: {result}")
        except Exception as e:
            print(f"Exception: {e}")

as_completed Function

Returns an iterator that yields Future objects as they complete, regardless of order.

def as_completed(fs, timeout=None):
    """
    Return iterator over futures as they complete.
    
    Parameters:
    - fs (iterable): Sequence of Future objects to monitor
    - timeout (float, optional): Maximum total time for iteration
    
    Yields:
    Future: Futures in order of completion
    
    Raises:
    TimeoutError: If entire iteration cannot complete before timeout
    
    Note: Duplicate futures in input are yielded only once
    """

Usage Examples

Basic as_completed usage:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def timed_task(n, delay):
    time.sleep(delay)
    return f"Task {n} finished after {delay}s"

with ThreadPoolExecutor(max_workers=4) as executor:
    # Submit tasks with different delays
    futures_dict = {
        executor.submit(timed_task, 1, 0.5): 1,
        executor.submit(timed_task, 2, 0.2): 2,  # Fastest
        executor.submit(timed_task, 3, 0.8): 3,
        executor.submit(timed_task, 4, 0.1): 4   # Actually fastest
    }
    
    # Process results as they complete
    for future in as_completed(futures_dict.keys()):
        task_id = futures_dict[future]
        try:
            result = future.result()
            print(f"Task {task_id}: {result}")
        except Exception as e:  
            print(f"Task {task_id} failed: {e}")

# Output order will be: Task 4, Task 2, Task 1, Task 3

as_completed with timeout:

with ThreadPoolExecutor(max_workers=3) as executor:
    futures_list = [
        executor.submit(timed_task, 1, 0.3),
        executor.submit(timed_task, 2, 0.6),
        executor.submit(timed_task, 3, 1.5)  # Too slow
    ]
    
    try:
        # Only wait 1 second total
        for future in as_completed(futures_list, timeout=1.0):
            result = future.result()
            print(f"Completed: {result}")
    except TimeoutError:
        print("Timeout exceeded - some futures may still be running")
        
        # Check what's still pending
        for future in futures_list:
            if not future.done():
                print(f"Still running: {future}")

Progress tracking with as_completed:

import time

def download_file(file_id, size):
    """Simulate file download"""
    time.sleep(size * 0.1)  # Simulate download time
    return f"File {file_id} ({size}MB) downloaded"

files_to_download = [
    (1, 5),   # file_id, size_mb
    (2, 12),
    (3, 3),
    (4, 8),
    (5, 15)
]

with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit all download tasks
    future_to_file = {
        executor.submit(download_file, file_id, size): (file_id, size)
        for file_id, size in files_to_download
    }
    
    completed = 0
    total = len(future_to_file)
    
    # Show progress as downloads complete
    for future in as_completed(future_to_file.keys()):
        file_id, size = future_to_file[future]
        completed += 1
        
        try:
            result = future.result()
            print(f"[{completed}/{total}] {result}")
        except Exception as e:
            print(f"[{completed}/{total}] File {file_id} failed: {e}")

Batch processing with as_completed:

def process_batch(batch_id, items):
    """Process a batch of items"""
    time.sleep(len(items) * 0.1)  # Processing time
    processed = [item.upper() for item in items]
    return {"batch_id": batch_id, "processed": processed}

# Split work into batches
all_items = ["apple", "banana", "cherry", "date", "elderberry", 
            "fig", "grape", "honeydew", "kiwi", "lemon"]
batch_size = 3
batches = [all_items[i:i+batch_size] for i in range(0, len(all_items), batch_size)]

with ThreadPoolExecutor(max_workers=2) as executor:
    # Submit all batches
    batch_futures = [
        executor.submit(process_batch, i, batch) 
        for i, batch in enumerate(batches)
    ]
    
    # Collect results as they complete
    all_processed = []
    for future in as_completed(batch_futures):
        try:
            result = future.result()
            all_processed.extend(result["processed"])
            print(f"Batch {result['batch_id']} completed")
        except Exception as e:
            print(f"Batch processing failed: {e}")
    
    print(f"All processed items: {all_processed}")

Return Types

class DoneAndNotDoneFutures:
    """
    Named tuple returned by wait() function.
    
    Attributes:
    - done (set): Set of completed Future objects
    - not_done (set): Set of uncompleted Future objects
    """
    done = None      # set of Future objects
    not_done = None  # set of Future objects

Constants

# Wait condition constants for use with wait()
FIRST_COMPLETED = 'FIRST_COMPLETED'   # Return when any future completes
FIRST_EXCEPTION = 'FIRST_EXCEPTION'   # Return when any future raises exception  
ALL_COMPLETED = 'ALL_COMPLETED'       # Return when all futures complete (default)

Advanced Patterns

Combining wait() and as_completed():

def process_with_fallback(tasks):
    """Process tasks with timeout and fallback handling"""
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures_list = [executor.submit(task_func, task) for task in tasks]
        
        # First, try to get some quick results
        done, not_done = wait(futures_list, timeout=1.0, return_when=FIRST_COMPLETED)
        
        # Process any quick results
        quick_results = []
        for future in done:
            try:
                quick_results.append(future.result())
            except Exception as e:
                print(f"Quick task failed: {e}")
        
        # Continue processing remaining tasks as they complete
        if not_done:
            for future in as_completed(not_done, timeout=5.0):
                try:
                    result = future.result()
                    quick_results.append(result)
                except Exception as e:
                    print(f"Slow task failed: {e}")
        
        return quick_results

Race condition handling:

def first_successful_result(tasks, max_workers=3):
    """Return first successful result, cancel others"""
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures_list = [executor.submit(task_func, task) for task in tasks]
        
        try:
            for future in as_completed(futures_list):
                try:
                    result = future.result()
                    # Got first successful result - cancel others
                    for f in futures_list:
                        if f != future:
                            f.cancel()
                    return result
                except Exception:
                    continue  # Try next future
            
            raise RuntimeError("All tasks failed")
        except TimeoutError:
            # Cancel all if timeout
            for future in futures_list:
                future.cancel()
            raise

Install with Tessl CLI

npx tessl i tessl/pypi-futures

docs

executors.md

futures.md

index.md

utilities.md

tile.json