or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

backend-context.mdcloudpickle-integration.mderror-handling.mdindex.mdprocess-pool-executor.mdreusable-executor.md

error-handling.mddocs/

0

# Error Handling

1

2

Loky provides comprehensive error handling with specialized exception classes for different failure modes in parallel processing. These exceptions help identify and handle specific error conditions that can occur during parallel execution.

3

4

## Capabilities

5

6

### Core Exception Classes

7

8

Exception classes for handling various error conditions in parallel processing.

9

10

```python { .api }

11

class BrokenProcessPool(Exception):

12

"""

13

Raised when the process pool is in a broken state and cannot execute tasks.

14

15

This exception indicates that the executor has encountered a fatal error

16

and cannot continue processing tasks. The executor should be shutdown

17

and recreated.

18

"""

19

20

class TerminatedWorkerError(BrokenProcessPool):

21

"""

22

Raised when a worker process terminates unexpectedly.

23

24

This is a subclass of BrokenProcessPool that specifically indicates

25

worker process failure. The executor may be able to recover by

26

restarting workers.

27

"""

28

29

class ShutdownExecutorError(RuntimeError):

30

"""

31

Raised when attempting to use an executor that has been shutdown.

32

33

This exception occurs when trying to submit tasks to an executor

34

that has already been shutdown via the shutdown() method.

35

"""

36

37

```

38

39

### Standard Exceptions

40

41

Re-exported exceptions from concurrent.futures for convenience.

42

43

```python { .api }

44

# Re-exported from concurrent.futures

45

CancelledError = concurrent.futures.CancelledError

46

TimeoutError = concurrent.futures.TimeoutError

47

```

48

49

## Usage Examples

50

51

### Handling Broken Process Pool

52

53

```python

54

from loky import get_reusable_executor, BrokenProcessPool

55

import os

56

import signal

57

58

def problematic_task(x):

59

"""Task that might crash the worker process."""

60

if x == 3:

61

# Simulate a worker crash

62

os._exit(1) # Force process termination

63

return x * 2

64

65

try:

66

executor = get_reusable_executor(max_workers=2)

67

68

# Submit tasks that include a problematic one

69

futures = [executor.submit(problematic_task, i) for i in range(5)]

70

71

results = []

72

for i, future in enumerate(futures):

73

try:

74

result = future.result(timeout=5)

75

results.append(result)

76

print(f"Task {i}: {result}")

77

except Exception as e:

78

print(f"Task {i} failed: {e}")

79

80

except BrokenProcessPool as e:

81

print(f"Process pool broken: {e}")

82

print("Creating new executor...")

83

84

# Create new executor after broken pool

85

executor = get_reusable_executor(max_workers=2, kill_workers=True)

86

print("New executor created successfully")

87

```

88

89

### Handling Terminated Workers

90

91

```python

92

from loky import ProcessPoolExecutor, TerminatedWorkerError

93

import time

94

95

def memory_intensive_task(size):

96

"""Task that might cause worker termination due to resource limits."""

97

try:

98

# Allocate large amount of memory

99

data = [0] * (size * 1000000) # size in millions of integers

100

return sum(data[:1000]) # Return small result

101

except MemoryError:

102

raise MemoryError(f"Cannot allocate {size}M integers")

103

104

def handle_worker_termination():

105

"""Demonstrate handling of terminated worker errors."""

106

with ProcessPoolExecutor(max_workers=2) as executor:

107

# Submit tasks with increasing memory requirements

108

sizes = [1, 10, 100, 1000, 10000] # Progressively larger

109

110

for size in sizes:

111

try:

112

future = executor.submit(memory_intensive_task, size)

113

result = future.result(timeout=10)

114

print(f"Size {size}M: Success ({result})")

115

116

except TerminatedWorkerError as e:

117

print(f"Size {size}M: Worker terminated ({e})")

118

# Executor may recover automatically

119

120

except MemoryError as e:

121

print(f"Size {size}M: Memory error ({e})")

122

123

except Exception as e:

124

print(f"Size {size}M: Other error ({e})")

125

126

handle_worker_termination()

127

```

128

129

### Handling Shutdown Errors

130

131

```python

132

from loky import ProcessPoolExecutor, ShutdownExecutorError

133

134

def task(x):

135

return x * 2

136

137

# Demonstrate shutdown error handling

138

executor = ProcessPoolExecutor(max_workers=2)

139

140

# Submit and process some tasks

141

future1 = executor.submit(task, 5)

142

result1 = future1.result()

143

print(f"Before shutdown: {result1}")

144

145

# Shutdown the executor

146

executor.shutdown(wait=True)

147

148

# Attempt to use shutdown executor

149

try:

150

future2 = executor.submit(task, 10)

151

result2 = future2.result()

152

except ShutdownExecutorError as e:

153

print(f"Cannot use shutdown executor: {e}")

154

155

# Create new executor for continued processing

156

new_executor = ProcessPoolExecutor(max_workers=2)

157

future3 = new_executor.submit(task, 10)

158

result3 = future3.result()

159

print(f"With new executor: {result3}")

160

new_executor.shutdown()

161

```

162

163

164

### Timeout Handling

165

166

```python

167

from loky import get_reusable_executor, TimeoutError

168

import time

169

170

def slow_task(duration):

171

"""Task that takes specified duration to complete."""

172

time.sleep(duration)

173

return f"Completed after {duration} seconds"

174

175

def handle_timeouts():

176

"""Demonstrate timeout error handling."""

177

executor = get_reusable_executor(max_workers=2)

178

179

tasks = [

180

(1, 3), # 1 second task, 3 second timeout - should succeed

181

(5, 2), # 5 second task, 2 second timeout - should timeout

182

(2, 4), # 2 second task, 4 second timeout - should succeed

183

]

184

185

for duration, timeout in tasks:

186

try:

187

future = executor.submit(slow_task, duration)

188

result = future.result(timeout=timeout)

189

print(f"Task ({duration}s, timeout {timeout}s): {result}")

190

191

except TimeoutError:

192

print(f"Task ({duration}s, timeout {timeout}s): Timed out")

193

# Task continues running in background

194

195

except Exception as e:

196

print(f"Task ({duration}s, timeout {timeout}s): Error - {e}")

197

198

handle_timeouts()

199

```

200

201

### Cancellation Handling

202

203

```python

204

from loky import get_reusable_executor, CancelledError

205

import time

206

207

def cancellable_task(task_id, duration):

208

"""Task that can be cancelled before completion."""

209

print(f"Task {task_id} starting (duration: {duration}s)")

210

time.sleep(duration)

211

print(f"Task {task_id} completed")

212

return f"Task {task_id} result"

213

214

def handle_cancellation():

215

"""Demonstrate task cancellation and error handling."""

216

executor = get_reusable_executor(max_workers=2)

217

218

# Submit multiple tasks

219

futures = []

220

for i in range(4):

221

future = executor.submit(cancellable_task, i, 3)

222

futures.append(future)

223

224

# Cancel some tasks after a short delay

225

time.sleep(0.5)

226

cancelled_count = 0

227

228

for i, future in enumerate(futures):

229

if i % 2 == 1: # Cancel odd-numbered tasks

230

if future.cancel():

231

print(f"Successfully cancelled task {i}")

232

cancelled_count += 1

233

else:

234

print(f"Could not cancel task {i} (already running)")

235

236

# Collect results

237

for i, future in enumerate(futures):

238

try:

239

result = future.result(timeout=5)

240

print(f"Task {i}: {result}")

241

242

except CancelledError:

243

print(f"Task {i}: Was cancelled")

244

245

except TimeoutError:

246

print(f"Task {i}: Timed out")

247

248

except Exception as e:

249

print(f"Task {i}: Error - {e}")

250

251

print(f"Cancelled {cancelled_count} tasks")

252

253

handle_cancellation()

254

```

255

256

### Comprehensive Error Recovery

257

258

```python

259

from loky import get_reusable_executor

260

from loky import (BrokenProcessPool, TerminatedWorkerError,

261

ShutdownExecutorError, TimeoutError, CancelledError)

262

import random

263

import time

264

265

def unreliable_task(task_id):

266

"""Task that randomly fails in different ways."""

267

failure_type = random.choice(['success', 'error', 'crash', 'slow'])

268

269

if failure_type == 'success':

270

return f"Task {task_id}: Success"

271

elif failure_type == 'error':

272

raise ValueError(f"Task {task_id}: Intentional error")

273

elif failure_type == 'crash':

274

import os

275

os._exit(1) # Simulate process crash

276

elif failure_type == 'slow':

277

time.sleep(10) # Very slow task

278

return f"Task {task_id}: Slow success"

279

280

def robust_task_execution(task_ids, max_retries=2):

281

"""Execute tasks with comprehensive error handling and retries."""

282

executor = None

283

results = {}

284

285

for task_id in task_ids:

286

retries = 0

287

success = False

288

289

while not success and retries <= max_retries:

290

try:

291

# Ensure we have a working executor

292

if executor is None:

293

executor = get_reusable_executor(max_workers=2)

294

295

# Submit task with timeout

296

future = executor.submit(unreliable_task, task_id)

297

result = future.result(timeout=3)

298

299

results[task_id] = result

300

success = True

301

print(f"✓ {result}")

302

303

except (BrokenProcessPool, TerminatedWorkerError) as e:

304

print(f"✗ Task {task_id}: Pool broken ({e})")

305

executor = None # Force new executor creation

306

retries += 1

307

308

except TimeoutError:

309

print(f"✗ Task {task_id}: Timeout (retry {retries + 1})")

310

retries += 1

311

312

except ValueError as e:

313

print(f"✗ Task {task_id}: Application error ({e})")

314

results[task_id] = f"Error: {e}"

315

success = True # Don't retry application errors

316

317

except Exception as e:

318

print(f"✗ Task {task_id}: Unexpected error ({e})")

319

retries += 1

320

321

if not success:

322

results[task_id] = f"Failed after {max_retries} retries"

323

print(f"✗ Task {task_id}: Gave up after {max_retries} retries")

324

325

# Clean up

326

if executor:

327

executor.shutdown(wait=False)

328

329

return results

330

331

# Execute unreliable tasks with error recovery

332

task_ids = list(range(10))

333

random.seed(42) # For reproducible results

334

results = robust_task_execution(task_ids)

335

336

print("\nFinal Results:")

337

for task_id, result in results.items():

338

print(f"Task {task_id}: {result}")

339

```

340

341

## Best Practices

342

343

### Error Detection

344

- Monitor for `BrokenProcessPool` to detect fatal executor errors

345

- Use `TerminatedWorkerError` to identify worker process failures

346

- Check for `ShutdownExecutorError` when reusing executor references

347

348

### Recovery Strategies

349

- Recreate executors after `BrokenProcessPool` exceptions

350

- Implement retry logic for transient failures

351

- Use timeouts to prevent hanging tasks

352

353

### Resource Management

354

- Always shutdown executors in finally blocks or use context managers

355

- Monitor system resources to prevent worker termination

356