or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

apply-functions.mdasync-results.mddashboard-integration.mdexception-handling.mdindex.mdparallel-map.mdperformance-insights.mdutility-functions.mdworker-configuration.mdworkerpool-management.md

exception-handling.mddocs/

0

# Exception Handling

1

2

Exception classes and utilities for handling errors in multiprocessing environments with enhanced traceback formatting. MPIRE provides graceful error propagation and enhanced debugging capabilities for parallel processing scenarios.

3

4

## Capabilities

5

6

### Exception Classes

7

8

Custom exception classes for worker control and error handling.

9

10

```python { .api }

11

class StopWorker(Exception):

12

"""Exception used to kill a worker"""

13

pass

14

15

class InterruptWorker(Exception):

16

"""Exception used to interrupt a worker"""

17

pass

18

19

class CannotPickleExceptionError(Exception):

20

"""Exception used when Pickle has trouble pickling the actual Exception"""

21

pass

22

```

23

24

**StopWorker**: Raise this exception to terminate a worker process immediately.

25

26

**InterruptWorker**: Raise this exception to interrupt a worker's current task.

27

28

**CannotPickleExceptionError**: Raised when an exception cannot be properly serialized for inter-process communication.

29

30

### Traceback Utilities

31

32

Functions for enhancing exception tracebacks and error display.

33

34

```python { .api }

35

def highlight_traceback(traceback_str: str) -> str

36

def remove_highlighting(traceback_str: str) -> str

37

def populate_exception(err_type: type, err_args: Any, err_state: Dict,

38

traceback_str: str) -> Tuple[Exception, Exception]

39

```

40

41

**highlight_traceback**: Add syntax highlighting to traceback strings for better readability in terminals.

42

43

**remove_highlighting**: Remove ANSI escape codes from highlighted traceback strings.

44

45

**populate_exception**: Reconstruct exception objects from serialized data with enhanced traceback information.

46

47

## Usage Examples

48

49

### Basic Exception Handling

50

51

```python

52

from mpire import WorkerPool

53

54

def risky_function(x):

55

if x < 0:

56

raise ValueError(f"Negative value not allowed: {x}")

57

if x == 0:

58

raise ZeroDivisionError("Cannot divide by zero")

59

return 10 / x

60

61

with WorkerPool(n_jobs=2) as pool:

62

test_data = [1, 2, -1, 0, 3, -2]

63

64

try:

65

results = pool.map(risky_function, test_data)

66

except Exception as e:

67

print(f"Map operation failed: {type(e).__name__}: {e}")

68

# MPIRE provides enhanced traceback information

69

import traceback

70

traceback.print_exc()

71

```

72

73

### Worker Control Exceptions

74

75

```python

76

from mpire import WorkerPool

77

from mpire.exception import StopWorker, InterruptWorker

78

79

def controlled_worker_function(x):

80

if x == 999:

81

# Terminate this worker

82

raise StopWorker("Worker received termination signal")

83

84

if x < 0:

85

# Interrupt current task but keep worker alive

86

raise InterruptWorker("Interrupting negative input processing")

87

88

# Normal processing

89

return x * 2

90

91

with WorkerPool(n_jobs=3) as pool:

92

test_data = [1, 2, 3, -1, 4, 999, 5, 6]

93

94

try:

95

results = pool.map(controlled_worker_function, test_data)

96

print("Results:", results)

97

except Exception as e:

98

print(f"Processing interrupted: {e}")

99

```

100

101

### Exception Callbacks

102

103

```python

104

from mpire import WorkerPool

105

import random

106

107

def unreliable_function(x):

108

# Simulate different types of failures

109

failure_type = random.choice(['none', 'value', 'runtime', 'custom'])

110

111

if failure_type == 'value':

112

raise ValueError(f"ValueError for input {x}")

113

elif failure_type == 'runtime':

114

raise RuntimeError(f"RuntimeError for input {x}")

115

elif failure_type == 'custom':

116

raise CustomError(f"CustomError for input {x}")

117

118

return x ** 2

119

120

class CustomError(Exception):

121

"""Custom exception for demonstration"""

122

pass

123

124

def error_handler(exception):

125

"""Handle different types of exceptions"""

126

error_type = type(exception).__name__

127

error_msg = str(exception)

128

129

print(f"Caught {error_type}: {error_msg}")

130

131

# Log or handle specific error types

132

if isinstance(exception, ValueError):

133

print(" → Handling ValueError with retry logic")

134

elif isinstance(exception, RuntimeError):

135

print(" → Handling RuntimeError with fallback")

136

else:

137

print(" → Handling unknown error type")

138

139

with WorkerPool(n_jobs=2) as pool:

140

# Apply async with error callback

141

async_results = []

142

for i in range(10):

143

result = pool.apply_async(

144

unreliable_function,

145

args=(i,),

146

error_callback=error_handler

147

)

148

async_results.append(result)

149

150

# Collect results

151

successful_results = []

152

for async_result in async_results:

153

try:

154

result = async_result.get()

155

successful_results.append(result)

156

except Exception:

157

# Error already handled by callback

158

pass

159

160

print(f"Successful results: {successful_results}")

161

```

162

163

### Enhanced Traceback Information

164

165

```python

166

from mpire import WorkerPool

167

from mpire.exception import highlight_traceback, remove_highlighting

168

169

def nested_function_error(x):

170

"""Function with nested calls to show traceback enhancement"""

171

def level1(val):

172

return level2(val)

173

174

def level2(val):

175

return level3(val)

176

177

def level3(val):

178

if val == 5:

179

raise RuntimeError(f"Deep error at level 3 with value {val}")

180

return val * 10

181

182

return level1(x)

183

184

with WorkerPool(n_jobs=2) as pool:

185

try:

186

results = pool.map(nested_function_error, range(10))

187

except Exception as e:

188

print("Exception caught with enhanced traceback:")

189

190

# Get the traceback as string

191

import traceback

192

tb_str = traceback.format_exc()

193

194

# Highlight the traceback (MPIRE does this automatically)

195

highlighted_tb = highlight_traceback(tb_str)

196

print(highlighted_tb)

197

198

# Or remove highlighting for logging

199

clean_tb = remove_highlighting(highlighted_tb)

200

print("\nClean traceback for logging:")

201

print(clean_tb)

202

```

203

204

### Timeout Exception Handling

205

206

```python

207

from mpire import WorkerPool

208

import time

209

210

def slow_function(duration):

211

time.sleep(duration)

212

return f"Completed after {duration} seconds"

213

214

with WorkerPool(n_jobs=2) as pool:

215

test_data = [0.5, 1.0, 5.0, 0.1, 10.0] # Some will timeout

216

217

try:

218

results = pool.map(

219

slow_function,

220

test_data,

221

task_timeout=2.0 # 2 second timeout

222

)

223

print("All tasks completed:", results)

224

except TimeoutError as e:

225

print(f"Timeout occurred: {e}")

226

print("Some tasks exceeded the 2-second limit")

227

except Exception as e:

228

print(f"Other error occurred: {type(e).__name__}: {e}")

229

```

230

231

### Custom Exception Handling in Workers

232

233

```python

234

from mpire import WorkerPool

235

236

class DataValidationError(Exception):

237

"""Custom exception for data validation"""

238

def __init__(self, message, data_item, validation_rule):

239

super().__init__(message)

240

self.data_item = data_item

241

self.validation_rule = validation_rule

242

243

def validate_and_process(data_item):

244

"""Function with custom validation and error handling"""

245

246

# Validation rules

247

if not isinstance(data_item, (int, float)):

248

raise DataValidationError(

249

f"Data must be numeric, got {type(data_item).__name__}",

250

data_item,

251

"type_check"

252

)

253

254

if data_item < 0:

255

raise DataValidationError(

256

f"Data must be non-negative, got {data_item}",

257

data_item,

258

"range_check"

259

)

260

261

if data_item > 1000:

262

raise DataValidationError(

263

f"Data must be <= 1000, got {data_item}",

264

data_item,

265

"upper_bound_check"

266

)

267

268

# Process valid data

269

return data_item ** 0.5

270

271

def custom_error_callback(exception):

272

"""Handle custom validation errors"""

273

if isinstance(exception, DataValidationError):

274

print(f"Validation failed: {exception}")

275

print(f" Data item: {exception.data_item}")

276

print(f" Rule: {exception.validation_rule}")

277

else:

278

print(f"Unexpected error: {type(exception).__name__}: {exception}")

279

280

# Test data with various validation issues

281

test_data = [4, 9, 16, -1, "invalid", 25, 1001, 36, None, 49]

282

283

with WorkerPool(n_jobs=2) as pool:

284

successful_results = []

285

286

for i, item in enumerate(test_data):

287

try:

288

result = pool.apply(

289

validate_and_process,

290

args=(item,),

291

error_callback=custom_error_callback

292

)

293

successful_results.append(result)

294

print(f"✓ Item {i}: {item} → {result}")

295

except DataValidationError as e:

296

print(f"✗ Item {i}: Validation failed - {e.validation_rule}")

297

except Exception as e:

298

print(f"✗ Item {i}: Unexpected error - {type(e).__name__}")

299

300

print(f"\nProcessed {len(successful_results)} items successfully")

301

print(f"Valid results: {successful_results}")

302

```

303

304

### Exception Propagation in Worker State

305

306

```python

307

from mpire import WorkerPool

308

309

def init_worker_with_validation(worker_state):

310

"""Initialize worker with validation"""

311

try:

312

# Simulate resource initialization that might fail

313

worker_state['resource'] = initialize_resource()

314

worker_state['initialized'] = True

315

except Exception as e:

316

worker_state['initialized'] = False

317

worker_state['init_error'] = str(e)

318

raise RuntimeError(f"Worker initialization failed: {e}")

319

320

def initialize_resource():

321

"""Simulate resource initialization"""

322

import random

323

if random.random() < 0.2: # 20% chance of failure

324

raise ConnectionError("Failed to connect to resource")

325

return "resource_handle"

326

327

def process_with_state_validation(worker_state, item):

328

"""Process item with state validation"""

329

if not worker_state.get('initialized', False):

330

raise RuntimeError("Worker not properly initialized")

331

332

# Use the initialized resource

333

resource = worker_state['resource']

334

return f"Processed {item} with {resource}"

335

336

with WorkerPool(n_jobs=3, use_worker_state=True) as pool:

337

try:

338

results = pool.map(

339

process_with_state_validation,

340

range(20),

341

worker_init=init_worker_with_validation,

342

chunk_size=5

343

)

344

print("All items processed successfully")

345

print(f"Results: {results[:5]}...") # Show first 5 results

346

347

except Exception as e:

348

print(f"Processing failed: {type(e).__name__}: {e}")

349

print("This could be due to worker initialization failure")

350

```