0
# Parallel Processing
1
2
Multi-threading and multi-processing utilities with simplified APIs, progress tracking, and seamless integration with fastcore's functional programming patterns. The parallel module provides enhanced executors and decorators for concurrent execution with improved error handling and debugging support.
3
4
## Capabilities
5
6
### Thread and Process Decorators
7
8
Simple decorators to convert functions for concurrent execution with automatic result handling.
9
10
```python { .api }
11
def threaded(process=False):
12
"""
13
Decorator to run function in Thread or Process.
14
15
Converts a function to run asynchronously in a separate thread or process.
16
The decorated function returns the Thread/Process object with a 'result'
17
attribute containing the function's return value.
18
19
Parameters:
20
- process: bool, use Process instead of Thread if True (default: False)
21
22
Returns:
23
Decorator that wraps functions for concurrent execution
24
25
Usage:
26
@threaded
27
def compute(): return expensive_calculation()
28
29
@threaded(process=True)
30
def cpu_intensive(): return heavy_computation()
31
"""
32
33
def startthread(f):
34
"""
35
Like threaded, but start thread immediately.
36
37
Decorator that immediately starts the thread when the decorated
38
function is called, rather than requiring manual .start().
39
40
Parameters:
41
- f: function to run in thread
42
43
Returns:
44
Started Thread object with result attribute
45
"""
46
47
def startproc(f):
48
"""
49
Like threaded(True), but start Process immediately.
50
51
Decorator that immediately starts the process when the decorated
52
function is called, providing instant execution.
53
54
Parameters:
55
- f: function to run in process
56
57
Returns:
58
Started Process object with result attribute
59
"""
60
```
61
62
### Enhanced Executor Classes
63
64
Improved ThreadPoolExecutor and ProcessPoolExecutor with better error handling and serial execution support.
65
66
```python { .api }
67
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
68
"""
69
Enhanced ThreadPoolExecutor with serial execution support and error handling.
70
71
Extends Python's ThreadPoolExecutor with the ability to run serially
72
(max_workers=0), better exception handling, and pause functionality
73
for rate limiting.
74
75
Parameters:
76
- max_workers: int, number of worker threads (0 for serial, None for CPU count)
77
- on_exc: callable, exception handler function (default: print)
78
- pause: float, seconds to pause between operations (default: 0)
79
- **kwargs: additional arguments passed to parent class
80
81
Features:
82
- Serial execution when max_workers=0 (useful for debugging)
83
- Automatic exception handling and reporting
84
- Built-in rate limiting with pause parameter
85
- Thread-safe operation with Manager().Lock()
86
"""
87
88
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs): ...
89
90
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
91
"""
92
Enhanced map with error handling and rate limiting.
93
94
Parameters:
95
- f: function to apply to each item
96
- items: iterable of items to process
97
- *args: additional arguments for f
98
- timeout: float, timeout for operations
99
- chunksize: int, items per chunk
100
- **kwargs: keyword arguments for f
101
102
Returns:
103
Iterator of results
104
"""
105
106
class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
107
"""
108
Enhanced ProcessPoolExecutor with serial execution support.
109
110
Extends Python's ProcessPoolExecutor with the same enhancements
111
as ThreadPoolExecutor, including serial execution mode and
112
improved error handling.
113
114
Parameters:
115
- max_workers: int, number of worker processes (0 for serial)
116
- on_exc: callable, exception handler function
117
- pause: float, seconds to pause between operations
118
- **kwargs: additional arguments passed to parent class
119
120
Note: Serial execution (max_workers=0) is particularly useful for
121
debugging multiprocessing code without the complexity of separate processes.
122
"""
123
124
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs): ...
125
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs): ...
126
```
127
128
### High-Level Parallel Functions
129
130
Convenient functions for parallel execution with automatic executor management.
131
132
```python { .api }
133
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None,
134
pause=0, method=map, timeout=None, chunksize=1, **kwargs):
135
"""
136
Parallel execution of function over items with progress tracking.
137
138
High-level interface for parallel processing that handles executor
139
creation, progress tracking, and result collection automatically.
140
141
Parameters:
142
- f: function to apply to each item
143
- items: iterable of items to process
144
- *args: additional positional arguments for f
145
- n_workers: int, number of workers (0 for serial execution)
146
- total: int, total items for progress tracking
147
- progress: bool|callable, progress display (True for default, callable for custom)
148
- pause: float, seconds between operations
149
- method: callable, execution method (map, starmap, etc.)
150
- timeout: float, timeout for operations
151
- chunksize: int, items per chunk for process pools
152
- **kwargs: keyword arguments for f
153
154
Returns:
155
L: List of results from applying f to each item
156
"""
157
158
def parallel_async(f, items, *args, **kwargs):
159
"""
160
Async version of parallel execution.
161
162
Provides asynchronous parallel execution for async functions
163
with the same interface as the synchronous parallel function.
164
165
Parameters:
166
- f: async function to apply
167
- items: iterable of items
168
- *args: positional arguments for f
169
- **kwargs: keyword arguments (same as parallel)
170
171
Returns:
172
Awaitable that resolves to L of results
173
"""
174
175
def run_procs(f, f_done, args, n_workers=1):
176
"""
177
Run processes with completion callbacks.
178
179
Execute function in multiple processes with callback functions
180
that are called when each process completes.
181
182
Parameters:
183
- f: function to run in each process
184
- f_done: callback function called when process completes
185
- args: list of argument tuples for each process
186
- n_workers: int, number of concurrent processes
187
"""
188
189
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
190
"""
191
Generate items in parallel using class methods.
192
193
Create instances of cls by processing items in parallel,
194
useful for data loading and transformation pipelines.
195
196
Parameters:
197
- cls: class to instantiate
198
- items: items to process
199
- n_workers: int, number of workers
200
- **kwargs: additional arguments for cls constructor
201
202
Yields:
203
Instances of cls created from processed items
204
"""
205
```
206
207
### Utility Functions
208
209
Helper functions for parallel processing setup and compatibility checking.
210
211
```python { .api }
212
def parallelable(param_name, num_workers, f=None):
213
"""
214
Check if function can be parallelized in current environment.
215
216
Determines whether parallel processing is available considering
217
platform limitations, notebook environments, and function location.
218
219
Parameters:
220
- param_name: str, name of parameter being checked
221
- num_workers: int, requested number of workers
222
- f: function, function to check (optional)
223
224
Returns:
225
bool: True if parallelization is possible
226
227
Note: Returns False and prints warning for Windows + Jupyter + main module functions
228
"""
229
```
230
231
## Usage Examples
232
233
### Basic Thread and Process Execution
234
235
```python
236
from fastcore.parallel import threaded, startthread, startproc
237
import time
238
239
# Simple threaded function
240
@threaded
241
def slow_calculation(x):
242
time.sleep(1)
243
return x ** 2
244
245
# Start thread and get result later
246
thread = slow_calculation(5)
247
thread.start()
248
# Do other work...
249
result = thread.result # 25
250
251
# Immediate thread execution
252
@startthread
253
def background_task():
254
time.sleep(2)
255
print("Background task completed")
256
return "done"
257
258
# Thread starts immediately, continues in background
259
thread = background_task()
260
261
# Process-based computation for CPU-intensive work
262
@threaded(process=True)
263
def cpu_intensive_task(data):
264
# Heavy computation that benefits from separate process
265
return sum(x*x for x in range(data))
266
267
proc = cpu_intensive_task(1000000)
268
proc.start()
269
result = proc.result
270
```
271
272
### Enhanced Executors
273
274
```python
275
from fastcore.parallel import ThreadPoolExecutor, ProcessPoolExecutor
276
import requests
277
278
# Thread pool with error handling and rate limiting
279
def fetch_url(url):
280
response = requests.get(url)
281
return response.status_code
282
283
urls = [
284
"https://httpbin.org/delay/1",
285
"https://httpbin.org/status/200",
286
"https://httpbin.org/status/404"
287
]
288
289
# Serial execution for debugging (max_workers=0)
290
with ThreadPoolExecutor(max_workers=0, pause=0.5) as executor:
291
results = list(executor.map(fetch_url, urls))
292
293
# Parallel execution with error handling
294
def safe_fetch(url):
295
try:
296
return requests.get(url, timeout=5).status_code
297
except Exception as e:
298
return f"Error: {e}"
299
300
with ThreadPoolExecutor(max_workers=3, on_exc=print) as executor:
301
results = list(executor.map(safe_fetch, urls))
302
303
# Process pool for CPU-intensive tasks
304
def compute_fibonacci(n):
305
if n <= 1: return n
306
return compute_fibonacci(n-1) + compute_fibonacci(n-2)
307
308
numbers = [30, 31, 32, 33, 34]
309
with ProcessPoolExecutor(max_workers=2) as executor:
310
results = list(executor.map(compute_fibonacci, numbers))
311
```
312
313
### High-Level Parallel Processing
314
315
```python
316
from fastcore.parallel import parallel, parallel_gen
317
from fastcore.foundation import L
318
import time
319
320
# Simple parallel map
321
def square(x):
322
time.sleep(0.1) # Simulate work
323
return x ** 2
324
325
numbers = range(10)
326
results = parallel(square, numbers, n_workers=4)
327
print(results) # L([0, 1, 4, 9, 16, 25, 36, 49, 64, 81])
328
329
# Parallel with progress tracking
330
from fastai.utils.testing import progress_bar
331
332
def slow_process(item):
333
time.sleep(0.2)
334
return item * 10
335
336
data = range(20)
337
results = parallel(
338
slow_process,
339
data,
340
n_workers=4,
341
progress=True, # Shows progress bar
342
total=len(data)
343
)
344
345
# Serial execution for debugging
346
debug_results = parallel(
347
slow_process,
348
data[:5],
349
n_workers=0 # Serial execution
350
)
351
352
# Parallel with additional arguments and kwargs
353
def process_with_params(item, multiplier=1, offset=0):
354
return item * multiplier + offset
355
356
results = parallel(
357
process_with_params,
358
numbers,
359
2, # multiplier argument
360
n_workers=3,
361
offset=10 # keyword argument
362
)
363
```
364
365
### Advanced Parallel Patterns
366
367
```python
368
from fastcore.parallel import run_procs, parallel_async
369
import asyncio
370
371
# Process with completion callbacks
372
def worker_func(data_chunk):
373
# Process chunk of data
374
result = sum(x*x for x in data_chunk)
375
return result
376
377
def completion_callback(result):
378
print(f"Chunk processed with result: {result}")
379
380
# Split work into chunks
381
data_chunks = [range(i*1000, (i+1)*1000) for i in range(4)]
382
383
run_procs(
384
worker_func,
385
completion_callback,
386
data_chunks,
387
n_workers=2
388
)
389
390
# Async parallel processing
391
async def async_fetch(url):
392
import aiohttp
393
async with aiohttp.ClientSession() as session:
394
async with session.get(url) as response:
395
return await response.text()
396
397
async def main():
398
urls = ["https://httpbin.org/uuid" for _ in range(5)]
399
results = await parallel_async(async_fetch, urls, n_workers=3)
400
return results
401
402
# Run async parallel
403
results = asyncio.run(main())
404
405
# Parallel object generation
406
class DataProcessor:
407
def __init__(self, raw_data):
408
self.processed = self.expensive_process(raw_data)
409
410
def expensive_process(self, data):
411
# Simulate expensive processing
412
time.sleep(0.1)
413
return data.upper() if isinstance(data, str) else str(data)
414
415
raw_items = ["hello", "world", "fastcore", "parallel"]
416
417
# Process items in parallel to create objects
418
processors = list(parallel_gen(
419
DataProcessor,
420
raw_items,
421
n_workers=2
422
))
423
424
for proc in processors:
425
print(proc.processed)
426
```
427
428
### Error Handling and Debugging
429
430
```python
431
from fastcore.parallel import parallel, ThreadPoolExecutor
432
import random
433
434
def risky_function(x):
435
if random.random() < 0.2: # 20% chance of error
436
raise ValueError(f"Random error with {x}")
437
return x * 2
438
439
# Custom error handler
440
def log_error(exc):
441
print(f"Caught exception: {type(exc).__name__}: {exc}")
442
443
# Parallel with error handling
444
try:
445
results = parallel(
446
risky_function,
447
range(20),
448
n_workers=4
449
)
450
except Exception as e:
451
print(f"Parallel execution failed: {e}")
452
453
# Using executor with custom error handler
454
with ThreadPoolExecutor(max_workers=4, on_exc=log_error) as executor:
455
# Errors are logged but don't stop processing
456
results = list(executor.map(risky_function, range(20)))
457
458
# Serial debugging mode
459
def debug_function(x):
460
print(f"Processing {x}")
461
if x == 5:
462
breakpoint() # Debugger will work in serial mode
463
return x ** 2
464
465
# Debug with serial execution
466
debug_results = parallel(
467
debug_function,
468
range(10),
469
n_workers=0 # Serial - debugger friendly
470
)
471
```
472
473
### Integration with FastCore Collections
474
475
```python
476
from fastcore.parallel import parallel
477
from fastcore.foundation import L
478
from fastcore.basics import listify
479
480
# Parallel processing with L collections
481
data = L(range(100))
482
483
# Parallel map maintaining L type
484
squared = data.map(lambda x: x**2) # Serial map
485
parallel_squared = parallel(lambda x: x**2, data, n_workers=4) # Parallel
486
487
# Filter then parallel process
488
filtered_data = data.filter(lambda x: x % 2 == 0)
489
results = parallel(
490
lambda x: x * 3,
491
filtered_data,
492
n_workers=3
493
)
494
495
# Parallel processing with complex transformations
496
def complex_transform(item):
497
# Simulate complex processing
498
import time
499
time.sleep(0.01)
500
return {
501
'original': item,
502
'squared': item ** 2,
503
'cubed': item ** 3
504
}
505
506
# Process in parallel, convert back to L
507
transformed = L(parallel(
508
complex_transform,
509
data[:20],
510
n_workers=4
511
))
512
513
# Extract specific fields in parallel
514
originals = parallel(lambda x: x['original'], transformed, n_workers=2)
515
squares = parallel(lambda x: x['squared'], transformed, n_workers=2)
516
```