Fast, extensible progress meter for loops and iterators in Python
Utilities for parallel processing with automatic progress tracking. These functions provide drop-in replacements for standard parallel processing patterns while adding progress bars and proper resource management.
High-level interface for thread-based parallel execution with automatic progress tracking and resource management.
from tqdm.contrib.concurrent import thread_map, ensure_lock
def thread_map(fn, *iterables, max_workers=None, chunksize=1, **tqdm_kwargs):
"""
Parallel mapping using ThreadPoolExecutor with progress tracking.
Equivalent to concurrent.futures.ThreadPoolExecutor().map() but with
a tqdm progress bar. Suitable for I/O-bound tasks.
Parameters:
- fn: Function to apply to each element
- *iterables: One or more iterables to process
- max_workers: Maximum number of threads (default: min(32, cpu_count + 4))
- chunksize: Size of chunks for batching (default: 1)
- **tqdm_kwargs: Additional arguments passed to tqdm constructor
Returns:
List of results in same order as input
"""
def ensure_lock(tqdm_class, lock_name=""):
"""
Context manager ensuring proper thread locking for progress bars.
Parameters:
- tqdm_class: tqdm class to use for locking
- lock_name: Optional lock identifier for debugging
Yields:
Context with guaranteed thread-safe progress bar operations
"""High-level interface for process-based parallel execution with progress tracking, suitable for CPU-intensive tasks.
from tqdm.contrib.concurrent import process_map
def process_map(fn, *iterables, max_workers=None, chunksize=1, **tqdm_kwargs):
"""
Parallel mapping using ProcessPoolExecutor with progress tracking.
Equivalent to concurrent.futures.ProcessPoolExecutor().map() but with
a tqdm progress bar. Suitable for CPU-bound tasks.
Parameters:
- fn: Function to apply to each element (must be picklable)
- *iterables: One or more iterables to process
- max_workers: Maximum number of processes (default: cpu_count)
- chunksize: Size of chunks for batching (default: 1)
- **tqdm_kwargs: Additional arguments passed to tqdm constructor
Returns:
List of results in same order as input
"""from tqdm.contrib.concurrent import thread_map
import requests
import time
def fetch_url(url):
"""Simulate I/O-bound task"""
response = requests.get(url)
return len(response.content)
# List of URLs to process
urls = [f"https://httpbin.org/delay/{i%3}" for i in range(20)]
# Parallel processing with progress bar
results = thread_map(
fetch_url,
urls,
max_workers=5,
desc="Fetching URLs",
unit="req"
)
print(f"Downloaded {sum(results)} total bytes")from tqdm.contrib.concurrent import process_map
import math
import time
def cpu_intensive_task(n):
"""Simulate CPU-bound task"""
# Calculate prime factors
factors = []
d = 2
while d * d <= n:
while n % d == 0:
factors.append(d)
n //= d
d += 1
if n > 1:
factors.append(n)
return factors
# Large numbers to factorize
numbers = [2**i + 1 for i in range(20, 40)]
# Parallel processing with progress bar
results = process_map(
cpu_intensive_task,
numbers,
max_workers=4,
desc="Factoring",
unit="num",
chunksize=2
)
for i, factors in enumerate(results):
print(f"{numbers[i]} = {' * '.join(map(str, factors))}")from tqdm.contrib.concurrent import thread_map, ensure_lock
from tqdm.auto import tqdm
import concurrent.futures
import threading
import time
import requests
def download_with_retry(url, max_retries=3):
"""Download with retry logic and individual progress tracking"""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return {
'url': url,
'size': len(response.content),
'attempt': attempt + 1
}
except Exception as e:
if attempt == max_retries - 1:
return {'url': url, 'error': str(e), 'attempt': attempt + 1}
time.sleep(2 ** attempt) # Exponential backoff
# Large batch of URLs
urls = [f"https://httpbin.org/status/{200 if i % 10 != 0 else 500}"
for i in range(100)]
# Parallel download with progress tracking
results = thread_map(
download_with_retry,
urls,
max_workers=10,
desc="Downloading",
unit="files",
leave=True
)
# Analyze results
successful = [r for r in results if 'error' not in r]
failed = [r for r in results if 'error' in r]
print(f"Downloaded: {len(successful)}, Failed: {len(failed)}")
if successful:
total_size = sum(r['size'] for r in successful)
avg_attempts = sum(r['attempt'] for r in successful) / len(successful)
print(f"Total size: {total_size} bytes, Avg attempts: {avg_attempts:.1f}")from tqdm.contrib.concurrent import process_map
from tqdm.auto import tqdm
import multiprocessing as mp
import numpy as np
import time
def monte_carlo_pi(n_samples):
"""Estimate Pi using Monte Carlo method"""
np.random.seed() # Ensure different seeds in each process
points = np.random.uniform(-1, 1, (n_samples, 2))
inside_circle = np.sum(np.sum(points**2, axis=1) <= 1)
return inside_circle
def estimate_pi_parallel(total_samples, n_chunks=None):
"""Parallel Pi estimation with progress tracking"""
if n_chunks is None:
n_chunks = mp.cpu_count()
chunk_size = total_samples // n_chunks
chunks = [chunk_size] * (n_chunks - 1) + [total_samples - chunk_size * (n_chunks - 1)]
# Run Monte Carlo simulations in parallel
inside_counts = process_map(
monte_carlo_pi,
chunks,
desc="Estimating π",
unit="chunk",
max_workers=n_chunks
)
total_inside = sum(inside_counts)
pi_estimate = 4 * total_inside / total_samples
return pi_estimate, total_inside, total_samples
# Estimate Pi with 10 million samples
pi_est, inside, total = estimate_pi_parallel(10_000_000, n_chunks=8)
error = abs(pi_est - np.pi) / np.pi * 100
print(f"Pi estimate: {pi_est:.6f}")
print(f"Actual Pi: {np.pi:.6f}")
print(f"Error: {error:.4f}%")
print(f"Points inside circle: {inside:,} / {total:,}")from tqdm.contrib.concurrent import thread_map, process_map
from tqdm.auto import tqdm
import concurrent.futures
import requests
import json
import time
def fetch_data(api_endpoint):
"""I/O-bound: Fetch data from API"""
response = requests.get(api_endpoint)
return response.json()
def process_data(data_item):
"""CPU-bound: Process fetched data"""
# Simulate heavy computation
result = {
'id': data_item.get('id'),
'processed_value': sum(ord(c) for c in str(data_item)) % 1000,
'timestamp': time.time()
}
time.sleep(0.1) # Simulate processing time
return result
def hybrid_processing_pipeline(api_endpoints):
"""Pipeline combining I/O and CPU bound tasks"""
# Step 1: Fetch data in parallel (I/O-bound - use threads)
print("Step 1: Fetching data from APIs...")
raw_data = thread_map(
fetch_data,
api_endpoints,
max_workers=10,
desc="Fetching",
unit="api"
)
# Filter out failed requests
valid_data = [item for item in raw_data if item is not None]
print(f"Successfully fetched {len(valid_data)} / {len(api_endpoints)} items")
# Step 2: Process data in parallel (CPU-bound - use processes)
print("Step 2: Processing data...")
processed_data = process_map(
process_data,
valid_data,
max_workers=4,
desc="Processing",
unit="item",
chunksize=5
)
return processed_data
# Example usage
api_urls = [f"https://jsonplaceholder.typicode.com/posts/{i}"
for i in range(1, 21)]
results = hybrid_processing_pipeline(api_urls)
print(f"Pipeline completed. Processed {len(results)} items.")from tqdm.contrib.concurrent import thread_map, process_map, ensure_lock
from tqdm.auto import tqdm
import concurrent.futures
import time
import random
def unreliable_task(item):
"""Task that sometimes fails"""
# Simulate random failures
if random.random() < 0.1: # 10% failure rate
raise ValueError(f"Task failed for item: {item}")
# Simulate work
time.sleep(random.uniform(0.1, 0.5))
return item * 2
def robust_parallel_processing(items, use_processes=False):
"""Robust parallel processing with error handling"""
def safe_task(item):
"""Wrapper that catches exceptions"""
try:
return {'success': True, 'result': unreliable_task(item), 'item': item}
except Exception as e:
return {'success': False, 'error': str(e), 'item': item}
# Choose processing method based on task type
if use_processes:
results = process_map(
safe_task,
items,
max_workers=2,
desc="Processing (multiprocess)",
unit="item"
)
else:
results = thread_map(
safe_task,
items,
max_workers=5,
desc="Processing (multithreaded)",
unit="item"
)
# Separate successful and failed results
successful = [r for r in results if r['success']]
failed = [r for r in results if not r['success']]
print(f"Successful: {len(successful)}, Failed: {len(failed)}")
# Retry failed items (example of retry logic)
if failed:
print("Retrying failed items...")
retry_items = [r['item'] for r in failed]
# Retry with threads (might work better for different reasons)
retry_results = thread_map(
safe_task,
retry_items,
max_workers=2,
desc="Retrying",
unit="item"
)
retry_successful = [r for r in retry_results if r['success']]
still_failed = [r for r in retry_results if not r['success']]
print(f"Retry successful: {len(retry_successful)}, Still failed: {len(still_failed)}")
successful.extend(retry_successful)
return successful, failed
# Test with sample data
test_items = list(range(1, 51))
success, failures = robust_parallel_processing(test_items, use_processes=False)
print(f"\nFinal results: {len(success)} successful, {len(failures)} failed")from tqdm.contrib.concurrent import thread_map, process_map
from tqdm.auto import tqdm
import time
import psutil
import threading
class PerformanceMonitor:
"""Monitor system resources during parallel processing"""
def __init__(self, interval=1.0):
self.interval = interval
self.monitoring = False
self.stats = {'cpu': [], 'memory': [], 'timestamps': []}
self.thread = None
def start(self):
"""Start monitoring system resources"""
self.monitoring = True
self.thread = threading.Thread(target=self._monitor)
self.thread.daemon = True
self.thread.start()
def stop(self):
"""Stop monitoring and return stats"""
self.monitoring = False
if self.thread:
self.thread.join()
return self.stats
def _monitor(self):
"""Internal monitoring loop"""
while self.monitoring:
self.stats['cpu'].append(psutil.cpu_percent())
self.stats['memory'].append(psutil.virtual_memory().percent)
self.stats['timestamps'].append(time.time())
time.sleep(self.interval)
def benchmark_processing_methods(items, task_func):
"""Compare thread_map vs process_map performance"""
# Test thread-based processing
print("Testing thread-based processing...")
monitor = PerformanceMonitor()
monitor.start()
start_time = time.time()
thread_results = thread_map(
task_func,
items,
max_workers=4,
desc="Thread-based",
unit="item"
)
thread_time = time.time() - start_time
thread_stats = monitor.stop()
# Test process-based processing
print("Testing process-based processing...")
monitor = PerformanceMonitor()
monitor.start()
start_time = time.time()
process_results = process_map(
task_func,
items,
max_workers=4,
desc="Process-based",
unit="item"
)
process_time = time.time() - start_time
process_stats = monitor.stop()
# Compare results
print(f"\nPerformance Comparison:")
print(f"Thread-based: {thread_time:.2f}s")
print(f"Process-based: {process_time:.2f}s")
print(f"Speedup ratio: {thread_time/process_time:.2f}x")
if thread_stats['cpu']:
print(f"\nResource Usage (Thread-based):")
print(f" Average CPU: {sum(thread_stats['cpu'])/len(thread_stats['cpu']):.1f}%")
print(f" Average Memory: {sum(thread_stats['memory'])/len(thread_stats['memory']):.1f}%")
if process_stats['cpu']:
print(f"\nResource Usage (Process-based):")
print(f" Average CPU: {sum(process_stats['cpu'])/len(process_stats['cpu']):.1f}%")
print(f" Average Memory: {sum(process_stats['memory'])/len(process_stats['memory']):.1f}%")
return thread_results, process_results
# Example CPU-bound task for benchmarking
def cpu_task(n):
"""Simple CPU-bound task"""
return sum(i**2 for i in range(n))
# Benchmark with different workloads
small_items = [1000] * 20
large_items = [10000] * 20
print("Benchmarking small workload...")
benchmark_processing_methods(small_items, cpu_task)
print("\nBenchmarking large workload...")
benchmark_processing_methods(large_items, cpu_task)Use thread_map for:
Use process_map for:
Chunking Strategy:
chunksize for small, fast taskschunksize for large, variable-duration tasksWorker Count:
cpu_count + 4 for I/O-bound taskscpu_count for CPU-bound tasksError Handling:
Install with Tessl CLI
npx tessl i tessl/pypi-tqdm