0
# Parallel Processing
1
2
Embarrassingly parallel computing with readable list comprehension syntax. Supports multiple backends (threading, multiprocessing, loky, dask) with automatic backend selection, comprehensive configuration options, and optimizations for NumPy arrays and scientific computing workflows.
3
4
## Capabilities
5
6
### Parallel Execution
7
8
Main class for parallel computations using familiar list comprehension patterns with automatic load balancing and error handling.
9
10
```python { .api }
11
class Parallel(Logger):
12
def __init__(self, n_jobs=None, backend=None, return_as="list", verbose=0, timeout=None,
13
pre_dispatch="2 * n_jobs", batch_size="auto", temp_folder=None,
14
max_nbytes="1M", mmap_mode="r", prefer=None, require=None, **backend_kwargs):
15
"""
16
Create parallel execution context.
17
18
Parameters:
19
- n_jobs: int or None, number of jobs (None uses default config, -1 for all CPUs, 1 for sequential)
20
- backend: str or None, execution backend (None uses default, "threading", "multiprocessing", "loky", "sequential", "dask")
21
- return_as: str, return format ("list", "generator", "generator_unordered")
22
- verbose: int, verbosity level (0=silent, 10=progress bar, 50=debug)
23
- timeout: float, timeout in seconds for the complete parallel call
24
- pre_dispatch: int or str, number of batches to pre-dispatch
25
- batch_size: int or "auto", size of batches for parallel execution
26
- temp_folder: str or None, temporary folder for memory mapping large arrays
27
- max_nbytes: str or int, memory threshold for automatic memory mapping
28
- mmap_mode: str, memory mapping mode ("r", "r+", "w+", "c")
29
- prefer: str or None, backend preference hint ("threads", "processes")
30
- require: str or None, backend requirement ("sharedmem")
31
- **backend_kwargs: additional backend-specific parameters
32
"""
33
34
def __call__(self, iterable):
35
"""
36
Execute parallel computation.
37
38
Parameters:
39
- iterable: iterable of delayed objects or callables
40
41
Returns:
42
List of results or generator (based on return_as parameter)
43
"""
44
45
def __enter__(self):
46
"""Context manager entry."""
47
return self
48
49
def __exit__(self, exc_type, exc_val, exc_tb):
50
"""Context manager exit with cleanup."""
51
```
52
53
**Usage Examples:**
54
55
```python
56
from joblib import Parallel, delayed
57
import numpy as np
58
59
# Basic parallel execution
60
def square(x):
61
return x ** 2
62
63
# Process numbers in parallel
64
results = Parallel(n_jobs=4)(delayed(square)(i) for i in range(10))
65
66
# With progress tracking
67
results = Parallel(n_jobs=4, verbose=10)(delayed(square)(i) for i in range(100))
68
69
# Context manager usage
70
with Parallel(n_jobs=4) as parallel:
71
batch1 = parallel(delayed(square)(i) for i in range(10))
72
batch2 = parallel(delayed(square)(i) for i in range(10, 20))
73
74
# Memory mapping for large arrays
75
def process_array(arr):
76
return np.sum(arr)
77
78
large_arrays = [np.random.random(10000) for _ in range(10)]
79
results = Parallel(n_jobs=4, max_nbytes='100M', mmap_mode='r')(
80
delayed(process_array)(arr) for arr in large_arrays
81
)
82
83
# Backend-specific configuration
84
results = Parallel(n_jobs=4, backend='multiprocessing',
85
temp_folder='/tmp/joblib')(
86
delayed(expensive_function)(i) for i in range(100)
87
)
88
```
89
90
### Delayed Function Wrapper
91
92
Decorator to capture function arguments for deferred parallel execution.
93
94
```python { .api }
95
def delayed(function):
96
"""
97
Decorator to capture function and arguments for parallel execution.
98
99
Parameters:
100
- function: callable, function to wrap for delayed execution
101
102
Returns:
103
DelayedFunc object that can be called to create delayed tasks
104
"""
105
```
106
107
**Usage Examples:**
108
109
```python
110
from joblib import Parallel, delayed
111
112
# Basic delayed usage
113
@delayed
114
def process_item(item, multiplier=2):
115
return item * multiplier
116
117
# Create delayed tasks
118
tasks = [process_item(i, multiplier=3) for i in range(10)]
119
120
# Execute in parallel
121
results = Parallel(n_jobs=4)(tasks)
122
123
# Alternative syntax without decorator
124
def compute(x, y):
125
return x + y
126
127
tasks = [delayed(compute)(i, i*2) for i in range(10)]
128
results = Parallel(n_jobs=4)(tasks)
129
130
# Method calls
131
class DataProcessor:
132
def process(self, data):
133
return data ** 2
134
135
processor = DataProcessor()
136
tasks = [delayed(processor.process)(data) for data in datasets]
137
results = Parallel(n_jobs=4)(tasks)
138
```
139
140
### CPU and Job Management
141
142
Utilities for determining optimal parallelization settings and hardware capabilities.
143
144
```python { .api }
145
def cpu_count(only_physical_cores=False):
146
"""
147
Return number of CPUs available.
148
149
Parameters:
150
- only_physical_cores: bool, count only physical cores (not hyperthreaded)
151
152
Returns:
153
int: Number of available CPUs
154
"""
155
156
def effective_n_jobs(n_jobs=-1):
157
"""
158
Determine actual number of parallel jobs that will be used.
159
160
Parameters:
161
- n_jobs: int, requested number of jobs (-1 for all CPUs)
162
163
Returns:
164
int: Actual number of jobs that will be used
165
"""
166
```
167
168
**Usage Examples:**
169
170
```python
171
from joblib import cpu_count, effective_n_jobs
172
173
# Check available CPUs
174
total_cpus = cpu_count()
175
physical_cpus = cpu_count(only_physical_cores=True)
176
177
print(f"Total CPUs: {total_cpus}, Physical: {physical_cpus}")
178
179
# Determine effective job count
180
actual_jobs = effective_n_jobs(-1) # All CPUs
181
half_jobs = effective_n_jobs(cpu_count() // 2) # Half CPUs
182
183
# Use in Parallel configuration
184
optimal_jobs = min(len(data_batches), cpu_count())
185
results = Parallel(n_jobs=optimal_jobs)(tasks)
186
```
187
188
### Configuration Context Managers
189
190
Context managers for configuring parallel execution settings globally or locally.
191
192
```python { .api }
193
class parallel_config:
194
def __init__(self, backend=None, *, n_jobs=None, verbose=0, temp_folder=None,
195
max_nbytes="1M", mmap_mode="r", prefer=None, require=None,
196
inner_max_num_threads=None, **backend_params):
197
"""
198
Context manager to configure parallel execution globally.
199
200
Parameters:
201
- backend: str or None, default backend for Parallel objects (None uses system default)
202
- n_jobs: int or None, default number of jobs (None uses system default)
203
- verbose: int, default verbosity level (default: 0)
204
- temp_folder: str or None, default temporary folder (None uses system default)
205
- max_nbytes: str or int, default memory threshold (default: "1M")
206
- mmap_mode: str, default memory mapping mode (default: "r")
207
- prefer: str or None, backend preference hint (None uses system default)
208
- require: str or None, backend requirement (None uses system default)
209
- inner_max_num_threads: int or None, maximum threads for inner parallelism
210
- **backend_params: additional backend parameters
211
"""
212
213
class parallel_backend(parallel_config):
214
def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None, **backend_params):
215
"""
216
Context manager to change default parallel backend.
217
218
Parameters:
219
- backend: str or backend instance, parallel backend to use
220
- n_jobs: int, number of jobs for this backend
221
- inner_max_num_threads: int, thread limit for inner parallelism
222
- **backend_params: additional backend-specific parameters
223
"""
224
```
225
226
**Usage Examples:**
227
228
```python
229
from joblib import Parallel, delayed, parallel_config, parallel_backend
230
231
# Global configuration
232
with parallel_config(backend='multiprocessing', n_jobs=4, verbose=10):
233
# All Parallel calls use these settings
234
result1 = Parallel()(delayed(func)(i) for i in range(10))
235
result2 = Parallel()(delayed(func)(i) for i in range(20, 30))
236
237
# Backend-specific configuration
238
with parallel_backend('threading', n_jobs=2):
239
result = Parallel()(delayed(io_bound_task)(i) for i in range(10))
240
241
with parallel_backend('multiprocessing', n_jobs=4):
242
result = Parallel()(delayed(cpu_bound_task)(i) for i in range(10))
243
244
# Nested configuration
245
with parallel_config(verbose=10):
246
with parallel_backend('loky', n_jobs=4):
247
result = Parallel()(delayed(func)(i) for i in range(100))
248
```
249
250
### Backend Registration
251
252
Register custom parallel execution backends for specialized computing environments.
253
254
```python { .api }
255
def register_parallel_backend(name, factory, make_default=False):
256
"""
257
Register a new parallel backend factory.
258
259
Parameters:
260
- name: str, backend name identifier
261
- factory: callable, factory function returning backend instance
262
- make_default: bool, whether to make this the default backend
263
264
Raises:
265
ValueError: If name already exists and factory is different
266
"""
267
```
268
269
**Usage Examples:**
270
271
```python
272
from joblib import register_parallel_backend, Parallel, delayed
273
from joblib._parallel_backends import ParallelBackendBase
274
275
class CustomBackend(ParallelBackendBase):
276
"""Custom parallel backend implementation."""
277
278
def effective_n_jobs(self, n_jobs):
279
return min(n_jobs, 8) # Limit to 8 jobs
280
281
def submit(self, func, callback=None):
282
# Custom job submission logic
283
pass
284
285
def retrieve_result(self, futures, timeout=None):
286
# Custom result retrieval logic
287
pass
288
289
# Register custom backend
290
register_parallel_backend('custom', CustomBackend)
291
292
# Use custom backend
293
with parallel_backend('custom'):
294
results = Parallel()(delayed(func)(i) for i in range(10))
295
296
# Register external backend (e.g., Ray)
297
def create_ray_backend(**kwargs):
298
from ray.util.joblib import register_ray
299
return register_ray()
300
301
register_parallel_backend('ray', create_ray_backend)
302
```
303
304
## Advanced Parallel Patterns
305
306
### Error Handling and Debugging
307
308
```python
309
from joblib import Parallel, delayed
310
311
def may_fail(x):
312
if x == 5:
313
raise ValueError(f"Failed on {x}")
314
return x ** 2
315
316
# Sequential execution for debugging
317
results = Parallel(n_jobs=1)(delayed(may_fail)(i) for i in range(10))
318
319
# Verbose output for monitoring
320
results = Parallel(n_jobs=4, verbose=50)(delayed(may_fail)(i) for i in range(10))
321
```
322
323
### Memory Management with Large Data
324
325
```python
326
import numpy as np
327
from joblib import Parallel, delayed
328
329
def process_large_array(arr):
330
return np.mean(arr)
331
332
# Automatic memory mapping for large arrays
333
large_arrays = [np.random.random(1000000) for _ in range(10)]
334
335
results = Parallel(
336
n_jobs=4,
337
max_nbytes='1G', # Trigger memory mapping above 1GB
338
mmap_mode='r', # Read-only memory mapping
339
temp_folder='/fast-storage/tmp'
340
)(delayed(process_large_array)(arr) for arr in large_arrays)
341
```
342
343
### Backend Selection Strategies
344
345
```python
346
from joblib import Parallel, delayed
347
348
# I/O bound tasks - use threading
349
def download_file(url):
350
return requests.get(url).content
351
352
urls = ['http://example.com/file{}.txt'.format(i) for i in range(10)]
353
results = Parallel(n_jobs=4, prefer='threads')(
354
delayed(download_file)(url) for url in urls
355
)
356
357
# CPU bound tasks - use processes
358
def cpu_intensive(data):
359
return np.fft.fft(data)
360
361
data_batches = [np.random.random(10000) for _ in range(10)]
362
results = Parallel(n_jobs=4, prefer='processes')(
363
delayed(cpu_intensive)(batch) for batch in data_batches
364
)
365
366
# Shared memory requirement
367
def shared_memory_task(shared_array, index):
368
return shared_array[index] * 2
369
370
results = Parallel(n_jobs=4, require='sharedmem')(
371
delayed(shared_memory_task)(array, i) for i in range(len(array))
372
)
373
```