CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-tqdm

Fast, extensible progress meter for loops and iterators in Python

Overview
Eval results
Files

parallel.mddocs/

Parallel Processing

Utilities for parallel processing with automatic progress tracking. These functions provide drop-in replacements for standard parallel processing patterns while adding progress bars and proper resource management.

Capabilities

Thread-Based Parallel Processing

High-level interface for thread-based parallel execution with automatic progress tracking and resource management.

from tqdm.contrib.concurrent import thread_map, ensure_lock

def thread_map(fn, *iterables, max_workers=None, chunksize=1, **tqdm_kwargs):
    """
    Parallel mapping using ThreadPoolExecutor with progress tracking.
    
    Equivalent to concurrent.futures.ThreadPoolExecutor().map() but with
    a tqdm progress bar. Suitable for I/O-bound tasks.
    
    Parameters:
    - fn: Function to apply to each element
    - *iterables: One or more iterables to process
    - max_workers: Maximum number of threads (default: min(32, cpu_count + 4))
    - chunksize: Size of chunks for batching (default: 1)
    - **tqdm_kwargs: Additional arguments passed to tqdm constructor
    
    Returns:
    List of results in same order as input
    """

def ensure_lock(tqdm_class, lock_name=""):
    """
    Context manager ensuring proper thread locking for progress bars.
    
    Parameters:
    - tqdm_class: tqdm class to use for locking
    - lock_name: Optional lock identifier for debugging
    
    Yields:
    Context with guaranteed thread-safe progress bar operations
    """

Process-Based Parallel Processing

High-level interface for process-based parallel execution with progress tracking, suitable for CPU-intensive tasks.

from tqdm.contrib.concurrent import process_map

def process_map(fn, *iterables, max_workers=None, chunksize=1, **tqdm_kwargs):
    """
    Parallel mapping using ProcessPoolExecutor with progress tracking.
    
    Equivalent to concurrent.futures.ProcessPoolExecutor().map() but with
    a tqdm progress bar. Suitable for CPU-bound tasks.
    
    Parameters:
    - fn: Function to apply to each element (must be picklable)
    - *iterables: One or more iterables to process
    - max_workers: Maximum number of processes (default: cpu_count)
    - chunksize: Size of chunks for batching (default: 1)
    - **tqdm_kwargs: Additional arguments passed to tqdm constructor
    
    Returns:
    List of results in same order as input
    """

Usage Examples

Basic Thread-Based Processing

from tqdm.contrib.concurrent import thread_map
import requests
import time

def fetch_url(url):
    """Simulate I/O-bound task"""
    response = requests.get(url)
    return len(response.content)

# List of URLs to process
urls = [f"https://httpbin.org/delay/{i%3}" for i in range(20)]

# Parallel processing with progress bar
results = thread_map(
    fetch_url, 
    urls,
    max_workers=5,
    desc="Fetching URLs",
    unit="req"
)

print(f"Downloaded {sum(results)} total bytes")

Basic Process-Based Processing

from tqdm.contrib.concurrent import process_map
import math
import time

def cpu_intensive_task(n):
    """Simulate CPU-bound task"""
    # Calculate prime factors
    factors = []
    d = 2
    while d * d <= n:
        while n % d == 0:
            factors.append(d)
            n //= d
        d += 1
    if n > 1:
        factors.append(n)
    return factors

# Large numbers to factorize
numbers = [2**i + 1 for i in range(20, 40)]

# Parallel processing with progress bar
results = process_map(
    cpu_intensive_task,
    numbers,
    max_workers=4,
    desc="Factoring",
    unit="num",
    chunksize=2
)

for i, factors in enumerate(results):
    print(f"{numbers[i]} = {' * '.join(map(str, factors))}")

Advanced Thread Pool Management

from tqdm.contrib.concurrent import thread_map, ensure_lock
from tqdm.auto import tqdm
import concurrent.futures
import threading
import time
import requests

def download_with_retry(url, max_retries=3):
    """Download with retry logic and individual progress tracking"""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return {
                'url': url,
                'size': len(response.content),
                'attempt': attempt + 1
            }
        except Exception as e:
            if attempt == max_retries - 1:
                return {'url': url, 'error': str(e), 'attempt': attempt + 1}
            time.sleep(2 ** attempt)  # Exponential backoff

# Large batch of URLs
urls = [f"https://httpbin.org/status/{200 if i % 10 != 0 else 500}" 
        for i in range(100)]

# Parallel download with progress tracking
results = thread_map(
    download_with_retry,
    urls,
    max_workers=10,
    desc="Downloading",
    unit="files",
    leave=True
)

# Analyze results
successful = [r for r in results if 'error' not in r]
failed = [r for r in results if 'error' in r]

print(f"Downloaded: {len(successful)}, Failed: {len(failed)}")
if successful:
    total_size = sum(r['size'] for r in successful)
    avg_attempts = sum(r['attempt'] for r in successful) / len(successful)
    print(f"Total size: {total_size} bytes, Avg attempts: {avg_attempts:.1f}")

Custom Process Pool with Progress

from tqdm.contrib.concurrent import process_map
from tqdm.auto import tqdm
import multiprocessing as mp
import numpy as np
import time

def monte_carlo_pi(n_samples):
    """Estimate Pi using Monte Carlo method"""
    np.random.seed()  # Ensure different seeds in each process
    points = np.random.uniform(-1, 1, (n_samples, 2))
    inside_circle = np.sum(np.sum(points**2, axis=1) <= 1)
    return inside_circle

def estimate_pi_parallel(total_samples, n_chunks=None):
    """Parallel Pi estimation with progress tracking"""
    if n_chunks is None:
        n_chunks = mp.cpu_count()
    
    chunk_size = total_samples // n_chunks
    chunks = [chunk_size] * (n_chunks - 1) + [total_samples - chunk_size * (n_chunks - 1)]
    
    # Run Monte Carlo simulations in parallel
    inside_counts = process_map(
        monte_carlo_pi,
        chunks,
        desc="Estimating π",
        unit="chunk",
        max_workers=n_chunks
    )
    
    total_inside = sum(inside_counts)
    pi_estimate = 4 * total_inside / total_samples
    
    return pi_estimate, total_inside, total_samples

# Estimate Pi with 10 million samples
pi_est, inside, total = estimate_pi_parallel(10_000_000, n_chunks=8)
error = abs(pi_est - np.pi) / np.pi * 100

print(f"Pi estimate: {pi_est:.6f}")
print(f"Actual Pi: {np.pi:.6f}")
print(f"Error: {error:.4f}%")
print(f"Points inside circle: {inside:,} / {total:,}")

Mixed Threading and Processing

from tqdm.contrib.concurrent import thread_map, process_map
from tqdm.auto import tqdm
import concurrent.futures
import requests
import json
import time

def fetch_data(api_endpoint):
    """I/O-bound: Fetch data from API"""
    response = requests.get(api_endpoint)
    return response.json()

def process_data(data_item):
    """CPU-bound: Process fetched data"""
    # Simulate heavy computation
    result = {
        'id': data_item.get('id'),
        'processed_value': sum(ord(c) for c in str(data_item)) % 1000,
        'timestamp': time.time()
    }
    time.sleep(0.1)  # Simulate processing time
    return result

def hybrid_processing_pipeline(api_endpoints):
    """Pipeline combining I/O and CPU bound tasks"""
    
    # Step 1: Fetch data in parallel (I/O-bound - use threads)
    print("Step 1: Fetching data from APIs...")
    raw_data = thread_map(
        fetch_data,
        api_endpoints,
        max_workers=10,
        desc="Fetching",
        unit="api"
    )
    
    # Filter out failed requests
    valid_data = [item for item in raw_data if item is not None]
    print(f"Successfully fetched {len(valid_data)} / {len(api_endpoints)} items")
    
    # Step 2: Process data in parallel (CPU-bound - use processes)
    print("Step 2: Processing data...")
    processed_data = process_map(
        process_data,
        valid_data,
        max_workers=4,
        desc="Processing",
        unit="item",
        chunksize=5
    )
    
    return processed_data

# Example usage
api_urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" 
            for i in range(1, 21)]

results = hybrid_processing_pipeline(api_urls)
print(f"Pipeline completed. Processed {len(results)} items.")

Error Handling and Resource Management

from tqdm.contrib.concurrent import thread_map, process_map, ensure_lock
from tqdm.auto import tqdm
import concurrent.futures
import time
import random

def unreliable_task(item):
    """Task that sometimes fails"""
    # Simulate random failures
    if random.random() < 0.1:  # 10% failure rate
        raise ValueError(f"Task failed for item: {item}")
    
    # Simulate work
    time.sleep(random.uniform(0.1, 0.5))
    return item * 2

def robust_parallel_processing(items, use_processes=False):
    """Robust parallel processing with error handling"""
    
    def safe_task(item):
        """Wrapper that catches exceptions"""
        try:
            return {'success': True, 'result': unreliable_task(item), 'item': item}
        except Exception as e:
            return {'success': False, 'error': str(e), 'item': item}
    
    # Choose processing method based on task type
    if use_processes:
        results = process_map(
            safe_task,
            items,
            max_workers=2,
            desc="Processing (multiprocess)",
            unit="item"
        )
    else:
        results = thread_map(
            safe_task,
            items,
            max_workers=5,
            desc="Processing (multithreaded)",
            unit="item"
        )
    
    # Separate successful and failed results
    successful = [r for r in results if r['success']]
    failed = [r for r in results if not r['success']]
    
    print(f"Successful: {len(successful)}, Failed: {len(failed)}")
    
    # Retry failed items (example of retry logic)
    if failed:
        print("Retrying failed items...")
        retry_items = [r['item'] for r in failed]
        
        # Retry with threads (might work better for different reasons)
        retry_results = thread_map(
            safe_task,
            retry_items,
            max_workers=2,
            desc="Retrying",
            unit="item"
        )
        
        retry_successful = [r for r in retry_results if r['success']]
        still_failed = [r for r in retry_results if not r['success']]
        
        print(f"Retry successful: {len(retry_successful)}, Still failed: {len(still_failed)}")
        successful.extend(retry_successful)
    
    return successful, failed

# Test with sample data
test_items = list(range(1, 51))
success, failures = robust_parallel_processing(test_items, use_processes=False)

print(f"\nFinal results: {len(success)} successful, {len(failures)} failed")

Performance Monitoring and Optimization

from tqdm.contrib.concurrent import thread_map, process_map
from tqdm.auto import tqdm
import time
import psutil
import threading

class PerformanceMonitor:
    """Monitor system resources during parallel processing"""
    
    def __init__(self, interval=1.0):
        self.interval = interval
        self.monitoring = False
        self.stats = {'cpu': [], 'memory': [], 'timestamps': []}
        self.thread = None
    
    def start(self):
        """Start monitoring system resources"""
        self.monitoring = True
        self.thread = threading.Thread(target=self._monitor)
        self.thread.daemon = True
        self.thread.start()
    
    def stop(self):
        """Stop monitoring and return stats"""
        self.monitoring = False
        if self.thread:
            self.thread.join()
        return self.stats
    
    def _monitor(self):
        """Internal monitoring loop"""
        while self.monitoring:
            self.stats['cpu'].append(psutil.cpu_percent())
            self.stats['memory'].append(psutil.virtual_memory().percent)
            self.stats['timestamps'].append(time.time())
            time.sleep(self.interval)

def benchmark_processing_methods(items, task_func):
    """Compare thread_map vs process_map performance"""
    
    # Test thread-based processing
    print("Testing thread-based processing...")
    monitor = PerformanceMonitor()
    monitor.start()
    
    start_time = time.time()
    thread_results = thread_map(
        task_func,
        items,
        max_workers=4,
        desc="Thread-based",
        unit="item"
    )
    thread_time = time.time() - start_time
    thread_stats = monitor.stop()
    
    # Test process-based processing
    print("Testing process-based processing...")
    monitor = PerformanceMonitor()
    monitor.start()
    
    start_time = time.time()
    process_results = process_map(
        task_func,
        items,
        max_workers=4,
        desc="Process-based",
        unit="item"
    )
    process_time = time.time() - start_time
    process_stats = monitor.stop()
    
    # Compare results
    print(f"\nPerformance Comparison:")
    print(f"Thread-based: {thread_time:.2f}s")
    print(f"Process-based: {process_time:.2f}s")
    print(f"Speedup ratio: {thread_time/process_time:.2f}x")
    
    if thread_stats['cpu']:
        print(f"\nResource Usage (Thread-based):")
        print(f"  Average CPU: {sum(thread_stats['cpu'])/len(thread_stats['cpu']):.1f}%")
        print(f"  Average Memory: {sum(thread_stats['memory'])/len(thread_stats['memory']):.1f}%")
    
    if process_stats['cpu']:
        print(f"\nResource Usage (Process-based):")
        print(f"  Average CPU: {sum(process_stats['cpu'])/len(process_stats['cpu']):.1f}%")
        print(f"  Average Memory: {sum(process_stats['memory'])/len(process_stats['memory']):.1f}%")
    
    return thread_results, process_results

# Example CPU-bound task for benchmarking
def cpu_task(n):
    """Simple CPU-bound task"""
    return sum(i**2 for i in range(n))

# Benchmark with different workloads
small_items = [1000] * 20
large_items = [10000] * 20

print("Benchmarking small workload...")
benchmark_processing_methods(small_items, cpu_task)

print("\nBenchmarking large workload...")
benchmark_processing_methods(large_items, cpu_task)

Best Practices

Choosing Between Threads and Processes

Use thread_map for:

  • I/O-bound tasks (file operations, network requests, database queries)
  • Tasks that share data or state
  • When memory usage is a concern
  • Quick tasks with low computational overhead

Use process_map for:

  • CPU-bound tasks (mathematical computations, data processing)
  • Tasks that can be easily parallelized
  • When maximum CPU utilization is needed
  • Tasks that don't require shared state

Performance Optimization

Chunking Strategy:

  • Use larger chunksize for small, fast tasks
  • Use smaller chunksize for large, variable-duration tasks
  • Monitor memory usage with large chunks

Worker Count:

  • Threads: Usually cpu_count + 4 for I/O-bound tasks
  • Processes: Usually cpu_count for CPU-bound tasks
  • Adjust based on system resources and task characteristics

Error Handling:

  • Always wrap tasks in try-catch for robust processing
  • Consider retry mechanisms for transient failures
  • Use progress bar postfix to display error counts

Memory Management

  • Be aware of memory multiplication in process pools
  • Use generators or iterators for large datasets
  • Monitor system resources during processing
  • Consider streaming approaches for very large datasets

Install with Tessl CLI

npx tessl i tessl/pypi-tqdm

docs

core.md

environments.md

index.md

integrations.md

parallel.md

utilities.md

tile.json