or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

apply-functions.mdasync-results.mddashboard-integration.mdexception-handling.mdindex.mdparallel-map.mdperformance-insights.mdutility-functions.mdworker-configuration.mdworkerpool-management.md

utility-functions.mddocs/

0

# Utility Functions

1

2

Utility functions for task chunking, CPU affinity management, timing operations, and other helper functionality. These utilities provide fine-grained control over multiprocessing performance and resource management.

3

4

## Capabilities

5

6

### CPU and System Utilities

7

8

Functions for managing CPU resources and system information.

9

10

```python { .api }

11

def cpu_count() -> int

12

def set_cpu_affinity(pid: int, mask: List[int]) -> None

13

```

14

15

**cpu_count**: Get the number of available CPU cores (imported from multiprocessing).

16

17

**set_cpu_affinity**: Set CPU affinity for a process to specific CPU cores.

18

- `pid` (int): Process ID to set affinity for

19

- `mask` (List[int]): List of CPU core IDs to bind the process to

20

21

### Task Chunking Utilities

22

23

Functions for optimizing task distribution and chunking strategies.

24

25

```python { .api }

26

def chunk_tasks(iterable_of_args: Iterable, iterable_len: Optional[int] = None,

27

n_jobs: Optional[int] = None, n_tasks_per_job: Optional[int] = None,

28

chunk_size: Optional[int] = None, n_splits: Optional[int] = None) -> Generator

29

30

def apply_numpy_chunking(iterable_of_args: Iterable, iterable_len: Optional[int] = None,

31

n_jobs: Optional[int] = None, n_tasks_per_job: Optional[int] = None,

32

chunk_size: Optional[int] = None, n_splits: Optional[int] = None) -> Generator

33

34

def get_n_chunks(iterable_of_args: Iterable, iterable_len: Optional[int] = None,

35

chunk_size: Optional[int] = None, n_jobs: Optional[int] = None,

36

n_tasks_per_job: Optional[int] = None, n_splits: Optional[int] = None) -> int

37

```

38

39

**chunk_tasks**: Split an iterable into optimally-sized chunks for parallel processing.

40

41

**apply_numpy_chunking**: Apply numpy-specific chunking optimizations for array processing.

42

43

**get_n_chunks**: Calculate the optimal number of chunks for given parameters.

44

45

### Argument Processing

46

47

Functions for preparing arguments for parallel processing.

48

49

```python { .api }

50

def make_single_arguments(iterable_of_args: Iterable, generator: bool = True) -> Union[List, Generator]

51

```

52

53

**make_single_arguments**: Convert multi-argument tuples to single arguments for functions expecting individual parameters.

54

55

### Time and Formatting Utilities

56

57

Functions for timing operations and formatting output.

58

59

```python { .api }

60

def format_seconds(seconds: Optional[Union[int, float]], with_milliseconds: bool) -> str

61

62

class TimeIt:

63

def __init__(self, label: str = "Operation") -> None

64

def __enter__(self) -> 'TimeIt'

65

def __exit__(self, exc_type, exc_val, exc_tb) -> None

66

```

67

68

**format_seconds**: Format seconds into human-readable time strings.

69

70

**TimeIt**: Context manager for timing code blocks with automatic reporting.

71

72

### Manager and Communication Utilities

73

74

Functions for creating multiprocessing managers and communication objects.

75

76

```python { .api }

77

def create_sync_manager(use_dill: bool) -> SyncManager

78

79

class NonPickledSyncManager:

80

"""Synchronization manager that doesn't require pickling"""

81

pass

82

```

83

84

**create_sync_manager**: Create a multiprocessing SyncManager with optional dill support.

85

86

**NonPickledSyncManager**: Alternative sync manager for scenarios where pickling is problematic.

87

88

## Usage Examples

89

90

### CPU Affinity Management

91

92

```python

93

from mpire import WorkerPool

94

from mpire.utils import set_cpu_affinity

95

import os

96

import time

97

98

def cpu_intensive_task(x):

99

"""CPU-bound task that benefits from CPU pinning"""

100

# Show which CPU the process is running on

101

pid = os.getpid()

102

print(f"Process {pid} processing {x}")

103

104

# CPU-intensive computation

105

result = 0

106

for i in range(x * 100000):

107

result += i

108

return result

109

110

# Pin workers to specific CPUs

111

cpu_assignments = [0, 1, 2, 3] # Use first 4 CPUs

112

113

with WorkerPool(n_jobs=4, cpu_ids=cpu_assignments) as pool:

114

results = pool.map(cpu_intensive_task, range(8))

115

print(f"Results: {results}")

116

117

# Manual CPU affinity setting

118

def set_process_affinity():

119

pid = os.getpid()

120

set_cpu_affinity(pid, [0, 2]) # Pin to CPUs 0 and 2

121

print(f"Process {pid} pinned to CPUs 0 and 2")

122

123

set_process_affinity()

124

```

125

126

### Task Chunking Optimization

127

128

```python

129

from mpire import WorkerPool

130

from mpire.utils import chunk_tasks, get_n_chunks

131

import time

132

133

def quick_task(x):

134

"""Fast task that benefits from larger chunks"""

135

return x * 2

136

137

def slow_task(x):

138

"""Slow task that benefits from smaller chunks"""

139

time.sleep(0.01)

140

return x ** 2

141

142

# Analyze chunking for different scenarios

143

data = range(1000)

144

145

print("=== Chunking Analysis ===")

146

147

# Quick tasks - use larger chunks to reduce overhead

148

quick_chunks = list(chunk_tasks(data, n_jobs=4, n_tasks_per_job=50))

149

print(f"Quick task chunks: {len(quick_chunks)} chunks")

150

print(f"Chunk sizes: {[len(chunk) for chunk in quick_chunks[:5]]}...")

151

152

# Slow tasks - use smaller chunks for better load balancing

153

slow_chunks = list(chunk_tasks(data, n_jobs=4, n_tasks_per_job=10))

154

print(f"Slow task chunks: {len(slow_chunks)} chunks")

155

print(f"Chunk sizes: {[len(chunk) for chunk in slow_chunks[:5]]}...")

156

157

# Calculate optimal chunk count

158

optimal_chunks = get_n_chunks(data, n_jobs=4, n_tasks_per_job=25)

159

print(f"Optimal chunk count: {optimal_chunks}")

160

161

# Test with WorkerPool

162

with WorkerPool(n_jobs=4) as pool:

163

# Quick tasks with large chunks

164

start_time = time.time()

165

results1 = pool.map(quick_task, data, chunk_size=50)

166

quick_time = time.time() - start_time

167

168

# Slow tasks with small chunks

169

start_time = time.time()

170

results2 = pool.map(slow_task, range(100), chunk_size=5)

171

slow_time = time.time() - start_time

172

173

print(f"Quick tasks time: {quick_time:.2f}s")

174

print(f"Slow tasks time: {slow_time:.2f}s")

175

```

176

177

### Numpy Array Chunking

178

179

```python

180

import numpy as np

181

from mpire import WorkerPool

182

from mpire.utils import apply_numpy_chunking

183

184

def process_array_chunk(chunk):

185

"""Process a numpy array chunk"""

186

# Simulate array processing

187

return np.sum(chunk ** 2)

188

189

# Create large numpy array

190

large_array = np.random.rand(10000)

191

192

print("=== Numpy Chunking ===")

193

194

# Apply numpy-specific chunking

195

chunks = list(apply_numpy_chunking(large_array, n_jobs=4, chunk_size=1000))

196

print(f"Created {len(chunks)} chunks")

197

print(f"Chunk shapes: {[chunk.shape for chunk in chunks[:3]]}...")

198

199

# Process with WorkerPool

200

with WorkerPool(n_jobs=4) as pool:

201

results = pool.map(process_array_chunk, chunks)

202

total_result = sum(results)

203

print(f"Total processing result: {total_result:.2f}")

204

205

# Compare with direct numpy processing

206

direct_result = np.sum(large_array ** 2)

207

print(f"Direct numpy result: {direct_result:.2f}")

208

print(f"Results match: {abs(total_result - direct_result) < 1e-10}")

209

```

210

211

### Argument Processing

212

213

```python

214

from mpire import WorkerPool

215

from mpire.utils import make_single_arguments

216

217

def multi_arg_function(a, b, c):

218

"""Function that expects multiple arguments"""

219

return a + b * c

220

221

def single_arg_function(args):

222

"""Function that expects a single tuple argument"""

223

a, b, c = args

224

return a + b * c

225

226

# Original data as tuples

227

multi_arg_data = [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)]

228

229

with WorkerPool(n_jobs=2) as pool:

230

# Method 1: Use starmap-like functionality (MPIRE handles this automatically)

231

results1 = pool.map(multi_arg_function, multi_arg_data)

232

print(f"Multi-arg results: {results1}")

233

234

# Method 2: Convert to single arguments if needed

235

single_args = make_single_arguments(multi_arg_data, generator=False)

236

results2 = pool.map(single_arg_function, single_args)

237

print(f"Single-arg results: {results2}")

238

239

# Verify results are the same

240

print(f"Results match: {results1 == results2}")

241

```

242

243

### Timing Operations

244

245

```python

246

from mpire import WorkerPool

247

from mpire.utils import TimeIt, format_seconds

248

import time

249

250

def timed_operation(duration):

251

"""Operation with known duration"""

252

time.sleep(duration)

253

return f"Slept for {duration} seconds"

254

255

# Time individual operations

256

with TimeIt("Single operation"):

257

result = timed_operation(0.5)

258

259

# Time parallel operations

260

with TimeIt("Parallel operations"):

261

with WorkerPool(n_jobs=3) as pool:

262

results = pool.map(timed_operation, [0.2, 0.3, 0.4, 0.1, 0.2])

263

264

# Manual timing with formatting

265

start_time = time.time()

266

with WorkerPool(n_jobs=2) as pool:

267

results = pool.map(timed_operation, [0.1] * 10)

268

elapsed_time = time.time() - start_time

269

270

formatted_time = format_seconds(elapsed_time, with_milliseconds=True)

271

print(f"Manual timing: {formatted_time}")

272

273

# Timing with different formatting options

274

test_times = [0.001, 0.1, 1.5, 65.3, 3661.7]

275

for t in test_times:

276

with_ms = format_seconds(t, with_milliseconds=True)

277

without_ms = format_seconds(t, with_milliseconds=False)

278

print(f"{t:8.3f}s -> With MS: {with_ms:>15} | Without MS: {without_ms:>10}")

279

```

280

281

### Custom Sync Manager Usage

282

283

```python

284

from mpire import WorkerPool

285

from mpire.utils import create_sync_manager, NonPickledSyncManager

286

import multiprocessing

287

288

def worker_with_shared_dict(shared_dict, worker_id, items):

289

"""Worker that updates a shared dictionary"""

290

for item in items:

291

shared_dict[f"worker_{worker_id}_item_{item}"] = item ** 2

292

return len(items)

293

294

# Example 1: Standard sync manager

295

print("=== Standard Sync Manager ===")

296

with create_sync_manager(use_dill=False) as manager:

297

shared_dict = manager.dict()

298

299

with WorkerPool(n_jobs=3, shared_objects=shared_dict) as pool:

300

results = pool.map(

301

worker_with_shared_dict,

302

[(0, [1, 2, 3]), (1, [4, 5, 6]), (2, [7, 8, 9])],

303

pass_worker_id=False # Pass worker_id manually in args

304

)

305

306

print(f"Processed items: {sum(results)}")

307

print(f"Shared dict contents: {dict(shared_dict)}")

308

309

# Example 2: Dill-enabled sync manager (for complex objects)

310

print("\n=== Dill Sync Manager ===")

311

try:

312

with create_sync_manager(use_dill=True) as manager:

313

# Create shared objects that might need dill

314

shared_list = manager.list()

315

shared_dict = manager.dict()

316

317

def complex_worker(shared_objects, data):

318

shared_list, shared_dict = shared_objects

319

# Process complex data types

320

shared_list.append(len(data))

321

shared_dict[f"len_{len(data)}"] = data

322

return sum(data)

323

324

test_data = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]

325

326

with WorkerPool(n_jobs=2, shared_objects=(shared_list, shared_dict)) as pool:

327

results = pool.map(complex_worker, test_data)

328

329

print(f"Results: {results}")

330

print(f"Shared list: {list(shared_list)}")

331

print(f"Shared dict keys: {list(shared_dict.keys())}")

332

333

except ImportError:

334

print("Dill not available, skipping dill manager example")

335

336

# Example 3: Non-pickled manager for special cases

337

print("\n=== Non-Pickled Manager ===")

338

def simple_shared_counter():

339

"""Example using a simple shared counter"""

340

counter = multiprocessing.Value('i', 0)

341

342

def increment_counter(shared_counter, increment):

343

with shared_counter.get_lock():

344

shared_counter.value += increment

345

return shared_counter.value

346

347

with WorkerPool(n_jobs=2, shared_objects=counter) as pool:

348

results = pool.map(increment_counter, [1, 2, 3, 4, 5])

349

350

print(f"Final counter value: {counter.value}")

351

print(f"Worker results: {results}")

352

353

simple_shared_counter()

354

```

355

356

### Combined Utility Usage

357

358

```python

359

from mpire import WorkerPool, cpu_count

360

from mpire.utils import TimeIt, format_seconds, chunk_tasks, set_cpu_affinity

361

import numpy as np

362

import time

363

364

def comprehensive_utility_example():

365

"""Example combining multiple utilities"""

366

367

print(f"=== System Information ===")

368

print(f"Available CPUs: {cpu_count()}")

369

370

# Generate test data

371

data_size = 10000

372

test_array = np.random.rand(data_size)

373

374

def array_processing_task(chunk):

375

"""Process array chunk with timing"""

376

start_time = time.time()

377

result = np.sum(chunk ** 2) + np.mean(chunk)

378

processing_time = time.time() - start_time

379

return result, processing_time

380

381

# Optimize chunking strategy

382

n_workers = min(4, cpu_count())

383

optimal_chunks = list(chunk_tasks(

384

test_array,

385

n_jobs=n_workers,

386

n_tasks_per_job=data_size // (n_workers * 4)

387

))

388

389

print(f"Created {len(optimal_chunks)} chunks for {n_workers} workers")

390

print(f"Chunk sizes: {[len(chunk) for chunk in optimal_chunks[:3]]}...")

391

392

# Process with timing and CPU pinning

393

with TimeIt("Complete parallel processing"):

394

cpu_ids = list(range(min(n_workers, cpu_count())))

395

396

with WorkerPool(n_jobs=n_workers, cpu_ids=cpu_ids, enable_insights=True) as pool:

397

results = pool.map(array_processing_task, optimal_chunks)

398

399

# Extract results and timings

400

values, timings = zip(*results)

401

total_value = sum(values)

402

avg_chunk_time = np.mean(timings)

403

404

print(f"Total processing value: {total_value:.4f}")

405

print(f"Average chunk processing time: {format_seconds(avg_chunk_time, True)}")

406

407

# Show insights

408

pool.print_insights()

409

410

# Compare with serial processing

411

with TimeIt("Serial processing"):

412

serial_result, serial_time = array_processing_task(test_array)

413

414

print(f"\nComparison:")

415

print(f"Parallel result: {total_value:.4f}")

416

print(f"Serial result: {serial_result:.4f}")

417

print(f"Results match: {abs(total_value - serial_result) < 1e-10}")

418

print(f"Serial processing time: {format_seconds(serial_time, True)}")

419

420

comprehensive_utility_example()

421

```