or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

collections.mdcore-utilities.mdextended.mdindex.mdmetaprogramming.mdnetworking.mdparallel.mdsystem-integration.mdtesting.mdxml-html.md

parallel.mddocs/

0

# Parallel Processing

1

2

Multi-threading and multi-processing utilities with simplified APIs, progress tracking, and seamless integration with fastcore's functional programming patterns. The parallel module provides enhanced executors and decorators for concurrent execution with improved error handling and debugging support.

3

4

## Capabilities

5

6

### Thread and Process Decorators

7

8

Simple decorators to convert functions for concurrent execution with automatic result handling.

9

10

```python { .api }

11

def threaded(process=False):

12

"""

13

Decorator to run function in Thread or Process.

14

15

Converts a function to run asynchronously in a separate thread or process.

16

The decorated function returns the Thread/Process object with a 'result'

17

attribute containing the function's return value.

18

19

Parameters:

20

- process: bool, use Process instead of Thread if True (default: False)

21

22

Returns:

23

Decorator that wraps functions for concurrent execution

24

25

Usage:

26

@threaded

27

def compute(): return expensive_calculation()

28

29

@threaded(process=True)

30

def cpu_intensive(): return heavy_computation()

31

"""

32

33

def startthread(f):

34

"""

35

Like threaded, but start thread immediately.

36

37

Decorator that immediately starts the thread when the decorated

38

function is called, rather than requiring manual .start().

39

40

Parameters:

41

- f: function to run in thread

42

43

Returns:

44

Started Thread object with result attribute

45

"""

46

47

def startproc(f):

48

"""

49

Like threaded(True), but start Process immediately.

50

51

Decorator that immediately starts the process when the decorated

52

function is called, providing instant execution.

53

54

Parameters:

55

- f: function to run in process

56

57

Returns:

58

Started Process object with result attribute

59

"""

60

```

61

62

### Enhanced Executor Classes

63

64

Improved ThreadPoolExecutor and ProcessPoolExecutor with better error handling and serial execution support.

65

66

```python { .api }

67

class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):

68

"""

69

Enhanced ThreadPoolExecutor with serial execution support and error handling.

70

71

Extends Python's ThreadPoolExecutor with the ability to run serially

72

(max_workers=0), better exception handling, and pause functionality

73

for rate limiting.

74

75

Parameters:

76

- max_workers: int, number of worker threads (0 for serial, None for CPU count)

77

- on_exc: callable, exception handler function (default: print)

78

- pause: float, seconds to pause between operations (default: 0)

79

- **kwargs: additional arguments passed to parent class

80

81

Features:

82

- Serial execution when max_workers=0 (useful for debugging)

83

- Automatic exception handling and reporting

84

- Built-in rate limiting with pause parameter

85

- Thread-safe operation with Manager().Lock()

86

"""

87

88

def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs): ...

89

90

def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):

91

"""

92

Enhanced map with error handling and rate limiting.

93

94

Parameters:

95

- f: function to apply to each item

96

- items: iterable of items to process

97

- *args: additional arguments for f

98

- timeout: float, timeout for operations

99

- chunksize: int, items per chunk

100

- **kwargs: keyword arguments for f

101

102

Returns:

103

Iterator of results

104

"""

105

106

class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):

107

"""

108

Enhanced ProcessPoolExecutor with serial execution support.

109

110

Extends Python's ProcessPoolExecutor with the same enhancements

111

as ThreadPoolExecutor, including serial execution mode and

112

improved error handling.

113

114

Parameters:

115

- max_workers: int, number of worker processes (0 for serial)

116

- on_exc: callable, exception handler function

117

- pause: float, seconds to pause between operations

118

- **kwargs: additional arguments passed to parent class

119

120

Note: Serial execution (max_workers=0) is particularly useful for

121

debugging multiprocessing code without the complexity of separate processes.

122

"""

123

124

def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs): ...

125

def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs): ...

126

```

127

128

### High-Level Parallel Functions

129

130

Convenient functions for parallel execution with automatic executor management.

131

132

```python { .api }

133

def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None,

134

pause=0, method=map, timeout=None, chunksize=1, **kwargs):

135

"""

136

Parallel execution of function over items with progress tracking.

137

138

High-level interface for parallel processing that handles executor

139

creation, progress tracking, and result collection automatically.

140

141

Parameters:

142

- f: function to apply to each item

143

- items: iterable of items to process

144

- *args: additional positional arguments for f

145

- n_workers: int, number of workers (0 for serial execution)

146

- total: int, total items for progress tracking

147

- progress: bool|callable, progress display (True for default, callable for custom)

148

- pause: float, seconds between operations

149

- method: callable, execution method (map, starmap, etc.)

150

- timeout: float, timeout for operations

151

- chunksize: int, items per chunk for process pools

152

- **kwargs: keyword arguments for f

153

154

Returns:

155

L: List of results from applying f to each item

156

"""

157

158

def parallel_async(f, items, *args, **kwargs):

159

"""

160

Async version of parallel execution.

161

162

Provides asynchronous parallel execution for async functions

163

with the same interface as the synchronous parallel function.

164

165

Parameters:

166

- f: async function to apply

167

- items: iterable of items

168

- *args: positional arguments for f

169

- **kwargs: keyword arguments (same as parallel)

170

171

Returns:

172

Awaitable that resolves to L of results

173

"""

174

175

def run_procs(f, f_done, args, n_workers=1):

176

"""

177

Run processes with completion callbacks.

178

179

Execute function in multiple processes with callback functions

180

that are called when each process completes.

181

182

Parameters:

183

- f: function to run in each process

184

- f_done: callback function called when process completes

185

- args: list of argument tuples for each process

186

- n_workers: int, number of concurrent processes

187

"""

188

189

def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):

190

"""

191

Generate items in parallel using class methods.

192

193

Create instances of cls by processing items in parallel,

194

useful for data loading and transformation pipelines.

195

196

Parameters:

197

- cls: class to instantiate

198

- items: items to process

199

- n_workers: int, number of workers

200

- **kwargs: additional arguments for cls constructor

201

202

Yields:

203

Instances of cls created from processed items

204

"""

205

```

206

207

### Utility Functions

208

209

Helper functions for parallel processing setup and compatibility checking.

210

211

```python { .api }

212

def parallelable(param_name, num_workers, f=None):

213

"""

214

Check if function can be parallelized in current environment.

215

216

Determines whether parallel processing is available considering

217

platform limitations, notebook environments, and function location.

218

219

Parameters:

220

- param_name: str, name of parameter being checked

221

- num_workers: int, requested number of workers

222

- f: function, function to check (optional)

223

224

Returns:

225

bool: True if parallelization is possible

226

227

Note: Returns False and prints warning for Windows + Jupyter + main module functions

228

"""

229

```

230

231

## Usage Examples

232

233

### Basic Thread and Process Execution

234

235

```python

236

from fastcore.parallel import threaded, startthread, startproc

237

import time

238

239

# Simple threaded function

240

@threaded

241

def slow_calculation(x):

242

time.sleep(1)

243

return x ** 2

244

245

# Start thread and get result later

246

thread = slow_calculation(5)

247

thread.start()

248

# Do other work...

249

result = thread.result # 25

250

251

# Immediate thread execution

252

@startthread

253

def background_task():

254

time.sleep(2)

255

print("Background task completed")

256

return "done"

257

258

# Thread starts immediately, continues in background

259

thread = background_task()

260

261

# Process-based computation for CPU-intensive work

262

@threaded(process=True)

263

def cpu_intensive_task(data):

264

# Heavy computation that benefits from separate process

265

return sum(x*x for x in range(data))

266

267

proc = cpu_intensive_task(1000000)

268

proc.start()

269

result = proc.result

270

```

271

272

### Enhanced Executors

273

274

```python

275

from fastcore.parallel import ThreadPoolExecutor, ProcessPoolExecutor

276

import requests

277

278

# Thread pool with error handling and rate limiting

279

def fetch_url(url):

280

response = requests.get(url)

281

return response.status_code

282

283

urls = [

284

"https://httpbin.org/delay/1",

285

"https://httpbin.org/status/200",

286

"https://httpbin.org/status/404"

287

]

288

289

# Serial execution for debugging (max_workers=0)

290

with ThreadPoolExecutor(max_workers=0, pause=0.5) as executor:

291

results = list(executor.map(fetch_url, urls))

292

293

# Parallel execution with error handling

294

def safe_fetch(url):

295

try:

296

return requests.get(url, timeout=5).status_code

297

except Exception as e:

298

return f"Error: {e}"

299

300

with ThreadPoolExecutor(max_workers=3, on_exc=print) as executor:

301

results = list(executor.map(safe_fetch, urls))

302

303

# Process pool for CPU-intensive tasks

304

def compute_fibonacci(n):

305

if n <= 1: return n

306

return compute_fibonacci(n-1) + compute_fibonacci(n-2)

307

308

numbers = [30, 31, 32, 33, 34]

309

with ProcessPoolExecutor(max_workers=2) as executor:

310

results = list(executor.map(compute_fibonacci, numbers))

311

```

312

313

### High-Level Parallel Processing

314

315

```python

316

from fastcore.parallel import parallel, parallel_gen

317

from fastcore.foundation import L

318

import time

319

320

# Simple parallel map

321

def square(x):

322

time.sleep(0.1) # Simulate work

323

return x ** 2

324

325

numbers = range(10)

326

results = parallel(square, numbers, n_workers=4)

327

print(results) # L([0, 1, 4, 9, 16, 25, 36, 49, 64, 81])

328

329

# Parallel with progress tracking

330

from fastai.utils.testing import progress_bar

331

332

def slow_process(item):

333

time.sleep(0.2)

334

return item * 10

335

336

data = range(20)

337

results = parallel(

338

slow_process,

339

data,

340

n_workers=4,

341

progress=True, # Shows progress bar

342

total=len(data)

343

)

344

345

# Serial execution for debugging

346

debug_results = parallel(

347

slow_process,

348

data[:5],

349

n_workers=0 # Serial execution

350

)

351

352

# Parallel with additional arguments and kwargs

353

def process_with_params(item, multiplier=1, offset=0):

354

return item * multiplier + offset

355

356

results = parallel(

357

process_with_params,

358

numbers,

359

2, # multiplier argument

360

n_workers=3,

361

offset=10 # keyword argument

362

)

363

```

364

365

### Advanced Parallel Patterns

366

367

```python

368

from fastcore.parallel import run_procs, parallel_async

369

import asyncio

370

371

# Process with completion callbacks

372

def worker_func(data_chunk):

373

# Process chunk of data

374

result = sum(x*x for x in data_chunk)

375

return result

376

377

def completion_callback(result):

378

print(f"Chunk processed with result: {result}")

379

380

# Split work into chunks

381

data_chunks = [range(i*1000, (i+1)*1000) for i in range(4)]

382

383

run_procs(

384

worker_func,

385

completion_callback,

386

data_chunks,

387

n_workers=2

388

)

389

390

# Async parallel processing

391

async def async_fetch(url):

392

import aiohttp

393

async with aiohttp.ClientSession() as session:

394

async with session.get(url) as response:

395

return await response.text()

396

397

async def main():

398

urls = ["https://httpbin.org/uuid" for _ in range(5)]

399

results = await parallel_async(async_fetch, urls, n_workers=3)

400

return results

401

402

# Run async parallel

403

results = asyncio.run(main())

404

405

# Parallel object generation

406

class DataProcessor:

407

def __init__(self, raw_data):

408

self.processed = self.expensive_process(raw_data)

409

410

def expensive_process(self, data):

411

# Simulate expensive processing

412

time.sleep(0.1)

413

return data.upper() if isinstance(data, str) else str(data)

414

415

raw_items = ["hello", "world", "fastcore", "parallel"]

416

417

# Process items in parallel to create objects

418

processors = list(parallel_gen(

419

DataProcessor,

420

raw_items,

421

n_workers=2

422

))

423

424

for proc in processors:

425

print(proc.processed)

426

```

427

428

### Error Handling and Debugging

429

430

```python

431

from fastcore.parallel import parallel, ThreadPoolExecutor

432

import random

433

434

def risky_function(x):

435

if random.random() < 0.2: # 20% chance of error

436

raise ValueError(f"Random error with {x}")

437

return x * 2

438

439

# Custom error handler

440

def log_error(exc):

441

print(f"Caught exception: {type(exc).__name__}: {exc}")

442

443

# Parallel with error handling

444

try:

445

results = parallel(

446

risky_function,

447

range(20),

448

n_workers=4

449

)

450

except Exception as e:

451

print(f"Parallel execution failed: {e}")

452

453

# Using executor with custom error handler

454

with ThreadPoolExecutor(max_workers=4, on_exc=log_error) as executor:

455

# Errors are logged but don't stop processing

456

results = list(executor.map(risky_function, range(20)))

457

458

# Serial debugging mode

459

def debug_function(x):

460

print(f"Processing {x}")

461

if x == 5:

462

breakpoint() # Debugger will work in serial mode

463

return x ** 2

464

465

# Debug with serial execution

466

debug_results = parallel(

467

debug_function,

468

range(10),

469

n_workers=0 # Serial - debugger friendly

470

)

471

```

472

473

### Integration with FastCore Collections

474

475

```python

476

from fastcore.parallel import parallel

477

from fastcore.foundation import L

478

from fastcore.basics import listify

479

480

# Parallel processing with L collections

481

data = L(range(100))

482

483

# Parallel map maintaining L type

484

squared = data.map(lambda x: x**2) # Serial map

485

parallel_squared = parallel(lambda x: x**2, data, n_workers=4) # Parallel

486

487

# Filter then parallel process

488

filtered_data = data.filter(lambda x: x % 2 == 0)

489

results = parallel(

490

lambda x: x * 3,

491

filtered_data,

492

n_workers=3

493

)

494

495

# Parallel processing with complex transformations

496

def complex_transform(item):

497

# Simulate complex processing

498

import time

499

time.sleep(0.01)

500

return {

501

'original': item,

502

'squared': item ** 2,

503

'cubed': item ** 3

504

}

505

506

# Process in parallel, convert back to L

507

transformed = L(parallel(

508

complex_transform,

509

data[:20],

510

n_workers=4

511

))

512

513

# Extract specific fields in parallel

514

originals = parallel(lambda x: x['original'], transformed, n_workers=2)

515

squares = parallel(lambda x: x['squared'], transformed, n_workers=2)

516

```