0
# Parallel Processing
1
2
Utilities for parallel processing with automatic progress tracking. These functions provide drop-in replacements for standard parallel processing patterns while adding progress bars and proper resource management.
3
4
## Capabilities
5
6
### Thread-Based Parallel Processing
7
8
High-level interface for thread-based parallel execution with automatic progress tracking and resource management.
9
10
```python { .api }
11
from tqdm.contrib.concurrent import thread_map, ensure_lock
12
13
def thread_map(fn, *iterables, max_workers=None, chunksize=1, **tqdm_kwargs):
14
"""
15
Parallel mapping using ThreadPoolExecutor with progress tracking.
16
17
Equivalent to concurrent.futures.ThreadPoolExecutor().map() but with
18
a tqdm progress bar. Suitable for I/O-bound tasks.
19
20
Parameters:
21
- fn: Function to apply to each element
22
- *iterables: One or more iterables to process
23
- max_workers: Maximum number of threads (default: min(32, cpu_count + 4))
24
- chunksize: Size of chunks for batching (default: 1)
25
- **tqdm_kwargs: Additional arguments passed to tqdm constructor
26
27
Returns:
28
List of results in same order as input
29
"""
30
31
def ensure_lock(tqdm_class, lock_name=""):
32
"""
33
Context manager ensuring proper thread locking for progress bars.
34
35
Parameters:
36
- tqdm_class: tqdm class to use for locking
37
- lock_name: Optional lock identifier for debugging
38
39
Yields:
40
Context with guaranteed thread-safe progress bar operations
41
"""
42
```
43
44
### Process-Based Parallel Processing
45
46
High-level interface for process-based parallel execution with progress tracking, suitable for CPU-intensive tasks.
47
48
```python { .api }
49
from tqdm.contrib.concurrent import process_map
50
51
def process_map(fn, *iterables, max_workers=None, chunksize=1, **tqdm_kwargs):
52
"""
53
Parallel mapping using ProcessPoolExecutor with progress tracking.
54
55
Equivalent to concurrent.futures.ProcessPoolExecutor().map() but with
56
a tqdm progress bar. Suitable for CPU-bound tasks.
57
58
Parameters:
59
- fn: Function to apply to each element (must be picklable)
60
- *iterables: One or more iterables to process
61
- max_workers: Maximum number of processes (default: cpu_count)
62
- chunksize: Size of chunks for batching (default: 1)
63
- **tqdm_kwargs: Additional arguments passed to tqdm constructor
64
65
Returns:
66
List of results in same order as input
67
"""
68
```
69
70
## Usage Examples
71
72
### Basic Thread-Based Processing
73
74
```python
75
from tqdm.contrib.concurrent import thread_map
76
import requests
77
import time
78
79
def fetch_url(url):
80
"""Simulate I/O-bound task"""
81
response = requests.get(url)
82
return len(response.content)
83
84
# List of URLs to process
85
urls = [f"https://httpbin.org/delay/{i%3}" for i in range(20)]
86
87
# Parallel processing with progress bar
88
results = thread_map(
89
fetch_url,
90
urls,
91
max_workers=5,
92
desc="Fetching URLs",
93
unit="req"
94
)
95
96
print(f"Downloaded {sum(results)} total bytes")
97
```
98
99
### Basic Process-Based Processing
100
101
```python
102
from tqdm.contrib.concurrent import process_map
103
import math
104
import time
105
106
def cpu_intensive_task(n):
107
"""Simulate CPU-bound task"""
108
# Calculate prime factors
109
factors = []
110
d = 2
111
while d * d <= n:
112
while n % d == 0:
113
factors.append(d)
114
n //= d
115
d += 1
116
if n > 1:
117
factors.append(n)
118
return factors
119
120
# Large numbers to factorize
121
numbers = [2**i + 1 for i in range(20, 40)]
122
123
# Parallel processing with progress bar
124
results = process_map(
125
cpu_intensive_task,
126
numbers,
127
max_workers=4,
128
desc="Factoring",
129
unit="num",
130
chunksize=2
131
)
132
133
for i, factors in enumerate(results):
134
print(f"{numbers[i]} = {' * '.join(map(str, factors))}")
135
```
136
137
### Advanced Thread Pool Management
138
139
```python
140
from tqdm.contrib.concurrent import thread_map, ensure_lock
141
from tqdm.auto import tqdm
142
import concurrent.futures
143
import threading
144
import time
145
import requests
146
147
def download_with_retry(url, max_retries=3):
148
"""Download with retry logic and individual progress tracking"""
149
for attempt in range(max_retries):
150
try:
151
response = requests.get(url, timeout=10)
152
response.raise_for_status()
153
return {
154
'url': url,
155
'size': len(response.content),
156
'attempt': attempt + 1
157
}
158
except Exception as e:
159
if attempt == max_retries - 1:
160
return {'url': url, 'error': str(e), 'attempt': attempt + 1}
161
time.sleep(2 ** attempt) # Exponential backoff
162
163
# Large batch of URLs
164
urls = [f"https://httpbin.org/status/{200 if i % 10 != 0 else 500}"
165
for i in range(100)]
166
167
# Parallel download with progress tracking
168
results = thread_map(
169
download_with_retry,
170
urls,
171
max_workers=10,
172
desc="Downloading",
173
unit="files",
174
leave=True
175
)
176
177
# Analyze results
178
successful = [r for r in results if 'error' not in r]
179
failed = [r for r in results if 'error' in r]
180
181
print(f"Downloaded: {len(successful)}, Failed: {len(failed)}")
182
if successful:
183
total_size = sum(r['size'] for r in successful)
184
avg_attempts = sum(r['attempt'] for r in successful) / len(successful)
185
print(f"Total size: {total_size} bytes, Avg attempts: {avg_attempts:.1f}")
186
```
187
188
### Custom Process Pool with Progress
189
190
```python
191
from tqdm.contrib.concurrent import process_map
192
from tqdm.auto import tqdm
193
import multiprocessing as mp
194
import numpy as np
195
import time
196
197
def monte_carlo_pi(n_samples):
198
"""Estimate Pi using Monte Carlo method"""
199
np.random.seed() # Ensure different seeds in each process
200
points = np.random.uniform(-1, 1, (n_samples, 2))
201
inside_circle = np.sum(np.sum(points**2, axis=1) <= 1)
202
return inside_circle
203
204
def estimate_pi_parallel(total_samples, n_chunks=None):
205
"""Parallel Pi estimation with progress tracking"""
206
if n_chunks is None:
207
n_chunks = mp.cpu_count()
208
209
chunk_size = total_samples // n_chunks
210
chunks = [chunk_size] * (n_chunks - 1) + [total_samples - chunk_size * (n_chunks - 1)]
211
212
# Run Monte Carlo simulations in parallel
213
inside_counts = process_map(
214
monte_carlo_pi,
215
chunks,
216
desc="Estimating π",
217
unit="chunk",
218
max_workers=n_chunks
219
)
220
221
total_inside = sum(inside_counts)
222
pi_estimate = 4 * total_inside / total_samples
223
224
return pi_estimate, total_inside, total_samples
225
226
# Estimate Pi with 10 million samples
227
pi_est, inside, total = estimate_pi_parallel(10_000_000, n_chunks=8)
228
error = abs(pi_est - np.pi) / np.pi * 100
229
230
print(f"Pi estimate: {pi_est:.6f}")
231
print(f"Actual Pi: {np.pi:.6f}")
232
print(f"Error: {error:.4f}%")
233
print(f"Points inside circle: {inside:,} / {total:,}")
234
```
235
236
### Mixed Threading and Processing
237
238
```python
239
from tqdm.contrib.concurrent import thread_map, process_map
240
from tqdm.auto import tqdm
241
import concurrent.futures
242
import requests
243
import json
244
import time
245
246
def fetch_data(api_endpoint):
247
"""I/O-bound: Fetch data from API"""
248
response = requests.get(api_endpoint)
249
return response.json()
250
251
def process_data(data_item):
252
"""CPU-bound: Process fetched data"""
253
# Simulate heavy computation
254
result = {
255
'id': data_item.get('id'),
256
'processed_value': sum(ord(c) for c in str(data_item)) % 1000,
257
'timestamp': time.time()
258
}
259
time.sleep(0.1) # Simulate processing time
260
return result
261
262
def hybrid_processing_pipeline(api_endpoints):
263
"""Pipeline combining I/O and CPU bound tasks"""
264
265
# Step 1: Fetch data in parallel (I/O-bound - use threads)
266
print("Step 1: Fetching data from APIs...")
267
raw_data = thread_map(
268
fetch_data,
269
api_endpoints,
270
max_workers=10,
271
desc="Fetching",
272
unit="api"
273
)
274
275
# Filter out failed requests
276
valid_data = [item for item in raw_data if item is not None]
277
print(f"Successfully fetched {len(valid_data)} / {len(api_endpoints)} items")
278
279
# Step 2: Process data in parallel (CPU-bound - use processes)
280
print("Step 2: Processing data...")
281
processed_data = process_map(
282
process_data,
283
valid_data,
284
max_workers=4,
285
desc="Processing",
286
unit="item",
287
chunksize=5
288
)
289
290
return processed_data
291
292
# Example usage
293
api_urls = [f"https://jsonplaceholder.typicode.com/posts/{i}"
294
for i in range(1, 21)]
295
296
results = hybrid_processing_pipeline(api_urls)
297
print(f"Pipeline completed. Processed {len(results)} items.")
298
```
299
300
### Error Handling and Resource Management
301
302
```python
303
from tqdm.contrib.concurrent import thread_map, process_map, ensure_lock
304
from tqdm.auto import tqdm
305
import concurrent.futures
306
import time
307
import random
308
309
def unreliable_task(item):
310
"""Task that sometimes fails"""
311
# Simulate random failures
312
if random.random() < 0.1: # 10% failure rate
313
raise ValueError(f"Task failed for item: {item}")
314
315
# Simulate work
316
time.sleep(random.uniform(0.1, 0.5))
317
return item * 2
318
319
def robust_parallel_processing(items, use_processes=False):
320
"""Robust parallel processing with error handling"""
321
322
def safe_task(item):
323
"""Wrapper that catches exceptions"""
324
try:
325
return {'success': True, 'result': unreliable_task(item), 'item': item}
326
except Exception as e:
327
return {'success': False, 'error': str(e), 'item': item}
328
329
# Choose processing method based on task type
330
if use_processes:
331
results = process_map(
332
safe_task,
333
items,
334
max_workers=2,
335
desc="Processing (multiprocess)",
336
unit="item"
337
)
338
else:
339
results = thread_map(
340
safe_task,
341
items,
342
max_workers=5,
343
desc="Processing (multithreaded)",
344
unit="item"
345
)
346
347
# Separate successful and failed results
348
successful = [r for r in results if r['success']]
349
failed = [r for r in results if not r['success']]
350
351
print(f"Successful: {len(successful)}, Failed: {len(failed)}")
352
353
# Retry failed items (example of retry logic)
354
if failed:
355
print("Retrying failed items...")
356
retry_items = [r['item'] for r in failed]
357
358
# Retry with threads (might work better for different reasons)
359
retry_results = thread_map(
360
safe_task,
361
retry_items,
362
max_workers=2,
363
desc="Retrying",
364
unit="item"
365
)
366
367
retry_successful = [r for r in retry_results if r['success']]
368
still_failed = [r for r in retry_results if not r['success']]
369
370
print(f"Retry successful: {len(retry_successful)}, Still failed: {len(still_failed)}")
371
successful.extend(retry_successful)
372
373
return successful, failed
374
375
# Test with sample data
376
test_items = list(range(1, 51))
377
success, failures = robust_parallel_processing(test_items, use_processes=False)
378
379
print(f"\nFinal results: {len(success)} successful, {len(failures)} failed")
380
```
381
382
### Performance Monitoring and Optimization
383
384
```python
385
from tqdm.contrib.concurrent import thread_map, process_map
386
from tqdm.auto import tqdm
387
import time
388
import psutil
389
import threading
390
391
class PerformanceMonitor:
392
"""Monitor system resources during parallel processing"""
393
394
def __init__(self, interval=1.0):
395
self.interval = interval
396
self.monitoring = False
397
self.stats = {'cpu': [], 'memory': [], 'timestamps': []}
398
self.thread = None
399
400
def start(self):
401
"""Start monitoring system resources"""
402
self.monitoring = True
403
self.thread = threading.Thread(target=self._monitor)
404
self.thread.daemon = True
405
self.thread.start()
406
407
def stop(self):
408
"""Stop monitoring and return stats"""
409
self.monitoring = False
410
if self.thread:
411
self.thread.join()
412
return self.stats
413
414
def _monitor(self):
415
"""Internal monitoring loop"""
416
while self.monitoring:
417
self.stats['cpu'].append(psutil.cpu_percent())
418
self.stats['memory'].append(psutil.virtual_memory().percent)
419
self.stats['timestamps'].append(time.time())
420
time.sleep(self.interval)
421
422
def benchmark_processing_methods(items, task_func):
423
"""Compare thread_map vs process_map performance"""
424
425
# Test thread-based processing
426
print("Testing thread-based processing...")
427
monitor = PerformanceMonitor()
428
monitor.start()
429
430
start_time = time.time()
431
thread_results = thread_map(
432
task_func,
433
items,
434
max_workers=4,
435
desc="Thread-based",
436
unit="item"
437
)
438
thread_time = time.time() - start_time
439
thread_stats = monitor.stop()
440
441
# Test process-based processing
442
print("Testing process-based processing...")
443
monitor = PerformanceMonitor()
444
monitor.start()
445
446
start_time = time.time()
447
process_results = process_map(
448
task_func,
449
items,
450
max_workers=4,
451
desc="Process-based",
452
unit="item"
453
)
454
process_time = time.time() - start_time
455
process_stats = monitor.stop()
456
457
# Compare results
458
print(f"\nPerformance Comparison:")
459
print(f"Thread-based: {thread_time:.2f}s")
460
print(f"Process-based: {process_time:.2f}s")
461
print(f"Speedup ratio: {thread_time/process_time:.2f}x")
462
463
if thread_stats['cpu']:
464
print(f"\nResource Usage (Thread-based):")
465
print(f" Average CPU: {sum(thread_stats['cpu'])/len(thread_stats['cpu']):.1f}%")
466
print(f" Average Memory: {sum(thread_stats['memory'])/len(thread_stats['memory']):.1f}%")
467
468
if process_stats['cpu']:
469
print(f"\nResource Usage (Process-based):")
470
print(f" Average CPU: {sum(process_stats['cpu'])/len(process_stats['cpu']):.1f}%")
471
print(f" Average Memory: {sum(process_stats['memory'])/len(process_stats['memory']):.1f}%")
472
473
return thread_results, process_results
474
475
# Example CPU-bound task for benchmarking
476
def cpu_task(n):
477
"""Simple CPU-bound task"""
478
return sum(i**2 for i in range(n))
479
480
# Benchmark with different workloads
481
small_items = [1000] * 20
482
large_items = [10000] * 20
483
484
print("Benchmarking small workload...")
485
benchmark_processing_methods(small_items, cpu_task)
486
487
print("\nBenchmarking large workload...")
488
benchmark_processing_methods(large_items, cpu_task)
489
```
490
491
## Best Practices
492
493
### Choosing Between Threads and Processes
494
495
**Use `thread_map` for:**
496
- I/O-bound tasks (file operations, network requests, database queries)
497
- Tasks that share data or state
498
- When memory usage is a concern
499
- Quick tasks with low computational overhead
500
501
**Use `process_map` for:**
502
- CPU-bound tasks (mathematical computations, data processing)
503
- Tasks that can be easily parallelized
504
- When maximum CPU utilization is needed
505
- Tasks that don't require shared state
506
507
### Performance Optimization
508
509
**Chunking Strategy:**
510
- Use larger `chunksize` for small, fast tasks
511
- Use smaller `chunksize` for large, variable-duration tasks
512
- Monitor memory usage with large chunks
513
514
**Worker Count:**
515
- Threads: Usually `cpu_count + 4` for I/O-bound tasks
516
- Processes: Usually `cpu_count` for CPU-bound tasks
517
- Adjust based on system resources and task characteristics
518
519
**Error Handling:**
520
- Always wrap tasks in try-catch for robust processing
521
- Consider retry mechanisms for transient failures
522
- Use progress bar postfix to display error counts
523
524
### Memory Management
525
526
- Be aware of memory multiplication in process pools
527
- Use generators or iterators for large datasets
528
- Monitor system resources during processing
529
- Consider streaming approaches for very large datasets