or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mdenvironments.mdindex.mdintegrations.mdparallel.mdutilities.md

parallel.mddocs/

0

# Parallel Processing

1

2

Utilities for parallel processing with automatic progress tracking. These functions provide drop-in replacements for standard parallel processing patterns while adding progress bars and proper resource management.

3

4

## Capabilities

5

6

### Thread-Based Parallel Processing

7

8

High-level interface for thread-based parallel execution with automatic progress tracking and resource management.

9

10

```python { .api }

11

from tqdm.contrib.concurrent import thread_map, ensure_lock

12

13

def thread_map(fn, *iterables, max_workers=None, chunksize=1, **tqdm_kwargs):

14

"""

15

Parallel mapping using ThreadPoolExecutor with progress tracking.

16

17

Equivalent to concurrent.futures.ThreadPoolExecutor().map() but with

18

a tqdm progress bar. Suitable for I/O-bound tasks.

19

20

Parameters:

21

- fn: Function to apply to each element

22

- *iterables: One or more iterables to process

23

- max_workers: Maximum number of threads (default: min(32, cpu_count + 4))

24

- chunksize: Size of chunks for batching (default: 1)

25

- **tqdm_kwargs: Additional arguments passed to tqdm constructor

26

27

Returns:

28

List of results in same order as input

29

"""

30

31

def ensure_lock(tqdm_class, lock_name=""):

32

"""

33

Context manager ensuring proper thread locking for progress bars.

34

35

Parameters:

36

- tqdm_class: tqdm class to use for locking

37

- lock_name: Optional lock identifier for debugging

38

39

Yields:

40

Context with guaranteed thread-safe progress bar operations

41

"""

42

```

43

44

### Process-Based Parallel Processing

45

46

High-level interface for process-based parallel execution with progress tracking, suitable for CPU-intensive tasks.

47

48

```python { .api }

49

from tqdm.contrib.concurrent import process_map

50

51

def process_map(fn, *iterables, max_workers=None, chunksize=1, **tqdm_kwargs):

52

"""

53

Parallel mapping using ProcessPoolExecutor with progress tracking.

54

55

Equivalent to concurrent.futures.ProcessPoolExecutor().map() but with

56

a tqdm progress bar. Suitable for CPU-bound tasks.

57

58

Parameters:

59

- fn: Function to apply to each element (must be picklable)

60

- *iterables: One or more iterables to process

61

- max_workers: Maximum number of processes (default: cpu_count)

62

- chunksize: Size of chunks for batching (default: 1)

63

- **tqdm_kwargs: Additional arguments passed to tqdm constructor

64

65

Returns:

66

List of results in same order as input

67

"""

68

```

69

70

## Usage Examples

71

72

### Basic Thread-Based Processing

73

74

```python

75

from tqdm.contrib.concurrent import thread_map

76

import requests

77

import time

78

79

def fetch_url(url):

80

"""Simulate I/O-bound task"""

81

response = requests.get(url)

82

return len(response.content)

83

84

# List of URLs to process

85

urls = [f"https://httpbin.org/delay/{i%3}" for i in range(20)]

86

87

# Parallel processing with progress bar

88

results = thread_map(

89

fetch_url,

90

urls,

91

max_workers=5,

92

desc="Fetching URLs",

93

unit="req"

94

)

95

96

print(f"Downloaded {sum(results)} total bytes")

97

```

98

99

### Basic Process-Based Processing

100

101

```python

102

from tqdm.contrib.concurrent import process_map

103

import math

104

import time

105

106

def cpu_intensive_task(n):

107

"""Simulate CPU-bound task"""

108

# Calculate prime factors

109

factors = []

110

d = 2

111

while d * d <= n:

112

while n % d == 0:

113

factors.append(d)

114

n //= d

115

d += 1

116

if n > 1:

117

factors.append(n)

118

return factors

119

120

# Large numbers to factorize

121

numbers = [2**i + 1 for i in range(20, 40)]

122

123

# Parallel processing with progress bar

124

results = process_map(

125

cpu_intensive_task,

126

numbers,

127

max_workers=4,

128

desc="Factoring",

129

unit="num",

130

chunksize=2

131

)

132

133

for i, factors in enumerate(results):

134

print(f"{numbers[i]} = {' * '.join(map(str, factors))}")

135

```

136

137

### Advanced Thread Pool Management

138

139

```python

140

from tqdm.contrib.concurrent import thread_map, ensure_lock

141

from tqdm.auto import tqdm

142

import concurrent.futures

143

import threading

144

import time

145

import requests

146

147

def download_with_retry(url, max_retries=3):

148

"""Download with retry logic and individual progress tracking"""

149

for attempt in range(max_retries):

150

try:

151

response = requests.get(url, timeout=10)

152

response.raise_for_status()

153

return {

154

'url': url,

155

'size': len(response.content),

156

'attempt': attempt + 1

157

}

158

except Exception as e:

159

if attempt == max_retries - 1:

160

return {'url': url, 'error': str(e), 'attempt': attempt + 1}

161

time.sleep(2 ** attempt) # Exponential backoff

162

163

# Large batch of URLs

164

urls = [f"https://httpbin.org/status/{200 if i % 10 != 0 else 500}"

165

for i in range(100)]

166

167

# Parallel download with progress tracking

168

results = thread_map(

169

download_with_retry,

170

urls,

171

max_workers=10,

172

desc="Downloading",

173

unit="files",

174

leave=True

175

)

176

177

# Analyze results

178

successful = [r for r in results if 'error' not in r]

179

failed = [r for r in results if 'error' in r]

180

181

print(f"Downloaded: {len(successful)}, Failed: {len(failed)}")

182

if successful:

183

total_size = sum(r['size'] for r in successful)

184

avg_attempts = sum(r['attempt'] for r in successful) / len(successful)

185

print(f"Total size: {total_size} bytes, Avg attempts: {avg_attempts:.1f}")

186

```

187

188

### Custom Process Pool with Progress

189

190

```python

191

from tqdm.contrib.concurrent import process_map

192

from tqdm.auto import tqdm

193

import multiprocessing as mp

194

import numpy as np

195

import time

196

197

def monte_carlo_pi(n_samples):

198

"""Estimate Pi using Monte Carlo method"""

199

np.random.seed() # Ensure different seeds in each process

200

points = np.random.uniform(-1, 1, (n_samples, 2))

201

inside_circle = np.sum(np.sum(points**2, axis=1) <= 1)

202

return inside_circle

203

204

def estimate_pi_parallel(total_samples, n_chunks=None):

205

"""Parallel Pi estimation with progress tracking"""

206

if n_chunks is None:

207

n_chunks = mp.cpu_count()

208

209

chunk_size = total_samples // n_chunks

210

chunks = [chunk_size] * (n_chunks - 1) + [total_samples - chunk_size * (n_chunks - 1)]

211

212

# Run Monte Carlo simulations in parallel

213

inside_counts = process_map(

214

monte_carlo_pi,

215

chunks,

216

desc="Estimating π",

217

unit="chunk",

218

max_workers=n_chunks

219

)

220

221

total_inside = sum(inside_counts)

222

pi_estimate = 4 * total_inside / total_samples

223

224

return pi_estimate, total_inside, total_samples

225

226

# Estimate Pi with 10 million samples

227

pi_est, inside, total = estimate_pi_parallel(10_000_000, n_chunks=8)

228

error = abs(pi_est - np.pi) / np.pi * 100

229

230

print(f"Pi estimate: {pi_est:.6f}")

231

print(f"Actual Pi: {np.pi:.6f}")

232

print(f"Error: {error:.4f}%")

233

print(f"Points inside circle: {inside:,} / {total:,}")

234

```

235

236

### Mixed Threading and Processing

237

238

```python

239

from tqdm.contrib.concurrent import thread_map, process_map

240

from tqdm.auto import tqdm

241

import concurrent.futures

242

import requests

243

import json

244

import time

245

246

def fetch_data(api_endpoint):

247

"""I/O-bound: Fetch data from API"""

248

response = requests.get(api_endpoint)

249

return response.json()

250

251

def process_data(data_item):

252

"""CPU-bound: Process fetched data"""

253

# Simulate heavy computation

254

result = {

255

'id': data_item.get('id'),

256

'processed_value': sum(ord(c) for c in str(data_item)) % 1000,

257

'timestamp': time.time()

258

}

259

time.sleep(0.1) # Simulate processing time

260

return result

261

262

def hybrid_processing_pipeline(api_endpoints):

263

"""Pipeline combining I/O and CPU bound tasks"""

264

265

# Step 1: Fetch data in parallel (I/O-bound - use threads)

266

print("Step 1: Fetching data from APIs...")

267

raw_data = thread_map(

268

fetch_data,

269

api_endpoints,

270

max_workers=10,

271

desc="Fetching",

272

unit="api"

273

)

274

275

# Filter out failed requests

276

valid_data = [item for item in raw_data if item is not None]

277

print(f"Successfully fetched {len(valid_data)} / {len(api_endpoints)} items")

278

279

# Step 2: Process data in parallel (CPU-bound - use processes)

280

print("Step 2: Processing data...")

281

processed_data = process_map(

282

process_data,

283

valid_data,

284

max_workers=4,

285

desc="Processing",

286

unit="item",

287

chunksize=5

288

)

289

290

return processed_data

291

292

# Example usage

293

api_urls = [f"https://jsonplaceholder.typicode.com/posts/{i}"

294

for i in range(1, 21)]

295

296

results = hybrid_processing_pipeline(api_urls)

297

print(f"Pipeline completed. Processed {len(results)} items.")

298

```

299

300

### Error Handling and Resource Management

301

302

```python

303

from tqdm.contrib.concurrent import thread_map, process_map, ensure_lock

304

from tqdm.auto import tqdm

305

import concurrent.futures

306

import time

307

import random

308

309

def unreliable_task(item):

310

"""Task that sometimes fails"""

311

# Simulate random failures

312

if random.random() < 0.1: # 10% failure rate

313

raise ValueError(f"Task failed for item: {item}")

314

315

# Simulate work

316

time.sleep(random.uniform(0.1, 0.5))

317

return item * 2

318

319

def robust_parallel_processing(items, use_processes=False):

320

"""Robust parallel processing with error handling"""

321

322

def safe_task(item):

323

"""Wrapper that catches exceptions"""

324

try:

325

return {'success': True, 'result': unreliable_task(item), 'item': item}

326

except Exception as e:

327

return {'success': False, 'error': str(e), 'item': item}

328

329

# Choose processing method based on task type

330

if use_processes:

331

results = process_map(

332

safe_task,

333

items,

334

max_workers=2,

335

desc="Processing (multiprocess)",

336

unit="item"

337

)

338

else:

339

results = thread_map(

340

safe_task,

341

items,

342

max_workers=5,

343

desc="Processing (multithreaded)",

344

unit="item"

345

)

346

347

# Separate successful and failed results

348

successful = [r for r in results if r['success']]

349

failed = [r for r in results if not r['success']]

350

351

print(f"Successful: {len(successful)}, Failed: {len(failed)}")

352

353

# Retry failed items (example of retry logic)

354

if failed:

355

print("Retrying failed items...")

356

retry_items = [r['item'] for r in failed]

357

358

# Retry with threads (might work better for different reasons)

359

retry_results = thread_map(

360

safe_task,

361

retry_items,

362

max_workers=2,

363

desc="Retrying",

364

unit="item"

365

)

366

367

retry_successful = [r for r in retry_results if r['success']]

368

still_failed = [r for r in retry_results if not r['success']]

369

370

print(f"Retry successful: {len(retry_successful)}, Still failed: {len(still_failed)}")

371

successful.extend(retry_successful)

372

373

return successful, failed

374

375

# Test with sample data

376

test_items = list(range(1, 51))

377

success, failures = robust_parallel_processing(test_items, use_processes=False)

378

379

print(f"\nFinal results: {len(success)} successful, {len(failures)} failed")

380

```

381

382

### Performance Monitoring and Optimization

383

384

```python

385

from tqdm.contrib.concurrent import thread_map, process_map

386

from tqdm.auto import tqdm

387

import time

388

import psutil

389

import threading

390

391

class PerformanceMonitor:

392

"""Monitor system resources during parallel processing"""

393

394

def __init__(self, interval=1.0):

395

self.interval = interval

396

self.monitoring = False

397

self.stats = {'cpu': [], 'memory': [], 'timestamps': []}

398

self.thread = None

399

400

def start(self):

401

"""Start monitoring system resources"""

402

self.monitoring = True

403

self.thread = threading.Thread(target=self._monitor)

404

self.thread.daemon = True

405

self.thread.start()

406

407

def stop(self):

408

"""Stop monitoring and return stats"""

409

self.monitoring = False

410

if self.thread:

411

self.thread.join()

412

return self.stats

413

414

def _monitor(self):

415

"""Internal monitoring loop"""

416

while self.monitoring:

417

self.stats['cpu'].append(psutil.cpu_percent())

418

self.stats['memory'].append(psutil.virtual_memory().percent)

419

self.stats['timestamps'].append(time.time())

420

time.sleep(self.interval)

421

422

def benchmark_processing_methods(items, task_func):

423

"""Compare thread_map vs process_map performance"""

424

425

# Test thread-based processing

426

print("Testing thread-based processing...")

427

monitor = PerformanceMonitor()

428

monitor.start()

429

430

start_time = time.time()

431

thread_results = thread_map(

432

task_func,

433

items,

434

max_workers=4,

435

desc="Thread-based",

436

unit="item"

437

)

438

thread_time = time.time() - start_time

439

thread_stats = monitor.stop()

440

441

# Test process-based processing

442

print("Testing process-based processing...")

443

monitor = PerformanceMonitor()

444

monitor.start()

445

446

start_time = time.time()

447

process_results = process_map(

448

task_func,

449

items,

450

max_workers=4,

451

desc="Process-based",

452

unit="item"

453

)

454

process_time = time.time() - start_time

455

process_stats = monitor.stop()

456

457

# Compare results

458

print(f"\nPerformance Comparison:")

459

print(f"Thread-based: {thread_time:.2f}s")

460

print(f"Process-based: {process_time:.2f}s")

461

print(f"Speedup ratio: {thread_time/process_time:.2f}x")

462

463

if thread_stats['cpu']:

464

print(f"\nResource Usage (Thread-based):")

465

print(f" Average CPU: {sum(thread_stats['cpu'])/len(thread_stats['cpu']):.1f}%")

466

print(f" Average Memory: {sum(thread_stats['memory'])/len(thread_stats['memory']):.1f}%")

467

468

if process_stats['cpu']:

469

print(f"\nResource Usage (Process-based):")

470

print(f" Average CPU: {sum(process_stats['cpu'])/len(process_stats['cpu']):.1f}%")

471

print(f" Average Memory: {sum(process_stats['memory'])/len(process_stats['memory']):.1f}%")

472

473

return thread_results, process_results

474

475

# Example CPU-bound task for benchmarking

476

def cpu_task(n):

477

"""Simple CPU-bound task"""

478

return sum(i**2 for i in range(n))

479

480

# Benchmark with different workloads

481

small_items = [1000] * 20

482

large_items = [10000] * 20

483

484

print("Benchmarking small workload...")

485

benchmark_processing_methods(small_items, cpu_task)

486

487

print("\nBenchmarking large workload...")

488

benchmark_processing_methods(large_items, cpu_task)

489

```

490

491

## Best Practices

492

493

### Choosing Between Threads and Processes

494

495

**Use `thread_map` for:**

496

- I/O-bound tasks (file operations, network requests, database queries)

497

- Tasks that share data or state

498

- When memory usage is a concern

499

- Quick tasks with low computational overhead

500

501

**Use `process_map` for:**

502

- CPU-bound tasks (mathematical computations, data processing)

503

- Tasks that can be easily parallelized

504

- When maximum CPU utilization is needed

505

- Tasks that don't require shared state

506

507

### Performance Optimization

508

509

**Chunking Strategy:**

510

- Use larger `chunksize` for small, fast tasks

511

- Use smaller `chunksize` for large, variable-duration tasks

512

- Monitor memory usage with large chunks

513

514

**Worker Count:**

515

- Threads: Usually `cpu_count + 4` for I/O-bound tasks

516

- Processes: Usually `cpu_count` for CPU-bound tasks

517

- Adjust based on system resources and task characteristics

518

519

**Error Handling:**

520

- Always wrap tasks in try-catch for robust processing

521

- Consider retry mechanisms for transient failures

522

- Use progress bar postfix to display error counts

523

524

### Memory Management

525

526

- Be aware of memory multiplication in process pools

527

- Use generators or iterators for large datasets

528

- Monitor system resources during processing

529

- Consider streaming approaches for very large datasets