or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdmemory-caching.mdparallel-processing.mdpersistence-serialization.mdutilities-infrastructure.md

parallel-processing.mddocs/

0

# Parallel Processing

1

2

Embarrassingly parallel computing with readable list comprehension syntax. Supports multiple backends (threading, multiprocessing, loky, dask) with automatic backend selection, comprehensive configuration options, and optimizations for NumPy arrays and scientific computing workflows.

3

4

## Capabilities

5

6

### Parallel Execution

7

8

Main class for parallel computations using familiar list comprehension patterns with automatic load balancing and error handling.

9

10

```python { .api }

11

class Parallel(Logger):

12

def __init__(self, n_jobs=None, backend=None, return_as="list", verbose=0, timeout=None,

13

pre_dispatch="2 * n_jobs", batch_size="auto", temp_folder=None,

14

max_nbytes="1M", mmap_mode="r", prefer=None, require=None, **backend_kwargs):

15

"""

16

Create parallel execution context.

17

18

Parameters:

19

- n_jobs: int or None, number of jobs (None uses default config, -1 for all CPUs, 1 for sequential)

20

- backend: str or None, execution backend (None uses default, "threading", "multiprocessing", "loky", "sequential", "dask")

21

- return_as: str, return format ("list", "generator", "generator_unordered")

22

- verbose: int, verbosity level (0=silent, 10=progress bar, 50=debug)

23

- timeout: float, timeout in seconds for the complete parallel call

24

- pre_dispatch: int or str, number of batches to pre-dispatch

25

- batch_size: int or "auto", size of batches for parallel execution

26

- temp_folder: str or None, temporary folder for memory mapping large arrays

27

- max_nbytes: str or int, memory threshold for automatic memory mapping

28

- mmap_mode: str, memory mapping mode ("r", "r+", "w+", "c")

29

- prefer: str or None, backend preference hint ("threads", "processes")

30

- require: str or None, backend requirement ("sharedmem")

31

- **backend_kwargs: additional backend-specific parameters

32

"""

33

34

def __call__(self, iterable):

35

"""

36

Execute parallel computation.

37

38

Parameters:

39

- iterable: iterable of delayed objects or callables

40

41

Returns:

42

List of results or generator (based on return_as parameter)

43

"""

44

45

def __enter__(self):

46

"""Context manager entry."""

47

return self

48

49

def __exit__(self, exc_type, exc_val, exc_tb):

50

"""Context manager exit with cleanup."""

51

```

52

53

**Usage Examples:**

54

55

```python

56

from joblib import Parallel, delayed

57

import numpy as np

58

59

# Basic parallel execution

60

def square(x):

61

return x ** 2

62

63

# Process numbers in parallel

64

results = Parallel(n_jobs=4)(delayed(square)(i) for i in range(10))

65

66

# With progress tracking

67

results = Parallel(n_jobs=4, verbose=10)(delayed(square)(i) for i in range(100))

68

69

# Context manager usage

70

with Parallel(n_jobs=4) as parallel:

71

batch1 = parallel(delayed(square)(i) for i in range(10))

72

batch2 = parallel(delayed(square)(i) for i in range(10, 20))

73

74

# Memory mapping for large arrays

75

def process_array(arr):

76

return np.sum(arr)

77

78

large_arrays = [np.random.random(10000) for _ in range(10)]

79

results = Parallel(n_jobs=4, max_nbytes='100M', mmap_mode='r')(

80

delayed(process_array)(arr) for arr in large_arrays

81

)

82

83

# Backend-specific configuration

84

results = Parallel(n_jobs=4, backend='multiprocessing',

85

temp_folder='/tmp/joblib')(

86

delayed(expensive_function)(i) for i in range(100)

87

)

88

```

89

90

### Delayed Function Wrapper

91

92

Decorator to capture function arguments for deferred parallel execution.

93

94

```python { .api }

95

def delayed(function):

96

"""

97

Decorator to capture function and arguments for parallel execution.

98

99

Parameters:

100

- function: callable, function to wrap for delayed execution

101

102

Returns:

103

DelayedFunc object that can be called to create delayed tasks

104

"""

105

```

106

107

**Usage Examples:**

108

109

```python

110

from joblib import Parallel, delayed

111

112

# Basic delayed usage

113

@delayed

114

def process_item(item, multiplier=2):

115

return item * multiplier

116

117

# Create delayed tasks

118

tasks = [process_item(i, multiplier=3) for i in range(10)]

119

120

# Execute in parallel

121

results = Parallel(n_jobs=4)(tasks)

122

123

# Alternative syntax without decorator

124

def compute(x, y):

125

return x + y

126

127

tasks = [delayed(compute)(i, i*2) for i in range(10)]

128

results = Parallel(n_jobs=4)(tasks)

129

130

# Method calls

131

class DataProcessor:

132

def process(self, data):

133

return data ** 2

134

135

processor = DataProcessor()

136

tasks = [delayed(processor.process)(data) for data in datasets]

137

results = Parallel(n_jobs=4)(tasks)

138

```

139

140

### CPU and Job Management

141

142

Utilities for determining optimal parallelization settings and hardware capabilities.

143

144

```python { .api }

145

def cpu_count(only_physical_cores=False):

146

"""

147

Return number of CPUs available.

148

149

Parameters:

150

- only_physical_cores: bool, count only physical cores (not hyperthreaded)

151

152

Returns:

153

int: Number of available CPUs

154

"""

155

156

def effective_n_jobs(n_jobs=-1):

157

"""

158

Determine actual number of parallel jobs that will be used.

159

160

Parameters:

161

- n_jobs: int, requested number of jobs (-1 for all CPUs)

162

163

Returns:

164

int: Actual number of jobs that will be used

165

"""

166

```

167

168

**Usage Examples:**

169

170

```python

171

from joblib import cpu_count, effective_n_jobs

172

173

# Check available CPUs

174

total_cpus = cpu_count()

175

physical_cpus = cpu_count(only_physical_cores=True)

176

177

print(f"Total CPUs: {total_cpus}, Physical: {physical_cpus}")

178

179

# Determine effective job count

180

actual_jobs = effective_n_jobs(-1) # All CPUs

181

half_jobs = effective_n_jobs(cpu_count() // 2) # Half CPUs

182

183

# Use in Parallel configuration

184

optimal_jobs = min(len(data_batches), cpu_count())

185

results = Parallel(n_jobs=optimal_jobs)(tasks)

186

```

187

188

### Configuration Context Managers

189

190

Context managers for configuring parallel execution settings globally or locally.

191

192

```python { .api }

193

class parallel_config:

194

def __init__(self, backend=None, *, n_jobs=None, verbose=0, temp_folder=None,

195

max_nbytes="1M", mmap_mode="r", prefer=None, require=None,

196

inner_max_num_threads=None, **backend_params):

197

"""

198

Context manager to configure parallel execution globally.

199

200

Parameters:

201

- backend: str or None, default backend for Parallel objects (None uses system default)

202

- n_jobs: int or None, default number of jobs (None uses system default)

203

- verbose: int, default verbosity level (default: 0)

204

- temp_folder: str or None, default temporary folder (None uses system default)

205

- max_nbytes: str or int, default memory threshold (default: "1M")

206

- mmap_mode: str, default memory mapping mode (default: "r")

207

- prefer: str or None, backend preference hint (None uses system default)

208

- require: str or None, backend requirement (None uses system default)

209

- inner_max_num_threads: int or None, maximum threads for inner parallelism

210

- **backend_params: additional backend parameters

211

"""

212

213

class parallel_backend(parallel_config):

214

def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None, **backend_params):

215

"""

216

Context manager to change default parallel backend.

217

218

Parameters:

219

- backend: str or backend instance, parallel backend to use

220

- n_jobs: int, number of jobs for this backend

221

- inner_max_num_threads: int, thread limit for inner parallelism

222

- **backend_params: additional backend-specific parameters

223

"""

224

```

225

226

**Usage Examples:**

227

228

```python

229

from joblib import Parallel, delayed, parallel_config, parallel_backend

230

231

# Global configuration

232

with parallel_config(backend='multiprocessing', n_jobs=4, verbose=10):

233

# All Parallel calls use these settings

234

result1 = Parallel()(delayed(func)(i) for i in range(10))

235

result2 = Parallel()(delayed(func)(i) for i in range(20, 30))

236

237

# Backend-specific configuration

238

with parallel_backend('threading', n_jobs=2):

239

result = Parallel()(delayed(io_bound_task)(i) for i in range(10))

240

241

with parallel_backend('multiprocessing', n_jobs=4):

242

result = Parallel()(delayed(cpu_bound_task)(i) for i in range(10))

243

244

# Nested configuration

245

with parallel_config(verbose=10):

246

with parallel_backend('loky', n_jobs=4):

247

result = Parallel()(delayed(func)(i) for i in range(100))

248

```

249

250

### Backend Registration

251

252

Register custom parallel execution backends for specialized computing environments.

253

254

```python { .api }

255

def register_parallel_backend(name, factory, make_default=False):

256

"""

257

Register a new parallel backend factory.

258

259

Parameters:

260

- name: str, backend name identifier

261

- factory: callable, factory function returning backend instance

262

- make_default: bool, whether to make this the default backend

263

264

Raises:

265

ValueError: If name already exists and factory is different

266

"""

267

```

268

269

**Usage Examples:**

270

271

```python

272

from joblib import register_parallel_backend, Parallel, delayed

273

from joblib._parallel_backends import ParallelBackendBase

274

275

class CustomBackend(ParallelBackendBase):

276

"""Custom parallel backend implementation."""

277

278

def effective_n_jobs(self, n_jobs):

279

return min(n_jobs, 8) # Limit to 8 jobs

280

281

def submit(self, func, callback=None):

282

# Custom job submission logic

283

pass

284

285

def retrieve_result(self, futures, timeout=None):

286

# Custom result retrieval logic

287

pass

288

289

# Register custom backend

290

register_parallel_backend('custom', CustomBackend)

291

292

# Use custom backend

293

with parallel_backend('custom'):

294

results = Parallel()(delayed(func)(i) for i in range(10))

295

296

# Register external backend (e.g., Ray)

297

def create_ray_backend(**kwargs):

298

from ray.util.joblib import register_ray

299

return register_ray()

300

301

register_parallel_backend('ray', create_ray_backend)

302

```

303

304

## Advanced Parallel Patterns

305

306

### Error Handling and Debugging

307

308

```python

309

from joblib import Parallel, delayed

310

311

def may_fail(x):

312

if x == 5:

313

raise ValueError(f"Failed on {x}")

314

return x ** 2

315

316

# Sequential execution for debugging

317

results = Parallel(n_jobs=1)(delayed(may_fail)(i) for i in range(10))

318

319

# Verbose output for monitoring

320

results = Parallel(n_jobs=4, verbose=50)(delayed(may_fail)(i) for i in range(10))

321

```

322

323

### Memory Management with Large Data

324

325

```python

326

import numpy as np

327

from joblib import Parallel, delayed

328

329

def process_large_array(arr):

330

return np.mean(arr)

331

332

# Automatic memory mapping for large arrays

333

large_arrays = [np.random.random(1000000) for _ in range(10)]

334

335

results = Parallel(

336

n_jobs=4,

337

max_nbytes='1G', # Trigger memory mapping above 1GB

338

mmap_mode='r', # Read-only memory mapping

339

temp_folder='/fast-storage/tmp'

340

)(delayed(process_large_array)(arr) for arr in large_arrays)

341

```

342

343

### Backend Selection Strategies

344

345

```python

346

from joblib import Parallel, delayed

347

348

# I/O bound tasks - use threading

349

def download_file(url):

350

return requests.get(url).content

351

352

urls = ['http://example.com/file{}.txt'.format(i) for i in range(10)]

353

results = Parallel(n_jobs=4, prefer='threads')(

354

delayed(download_file)(url) for url in urls

355

)

356

357

# CPU bound tasks - use processes

358

def cpu_intensive(data):

359

return np.fft.fft(data)

360

361

data_batches = [np.random.random(10000) for _ in range(10)]

362

results = Parallel(n_jobs=4, prefer='processes')(

363

delayed(cpu_intensive)(batch) for batch in data_batches

364

)

365

366

# Shared memory requirement

367

def shared_memory_task(shared_array, index):

368

return shared_array[index] * 2

369

370

results = Parallel(n_jobs=4, require='sharedmem')(

371

delayed(shared_memory_task)(array, i) for i in range(len(array))

372

)

373

```