0
# Exception Handling
1
2
Locust's exception system provides control flow mechanisms for test execution, response validation, and error handling across users, tasks, and distributed testing scenarios.
3
4
## Capabilities
5
6
### Core Exceptions
7
8
Base exception classes for general Locust errors and response handling.
9
10
```python { .api }
11
from locust.exception import LocustError, ResponseError, CatchResponseError
12
13
class LocustError(Exception):
14
"""
15
Base exception class for all Locust-specific errors.
16
17
Use as base class for custom Locust exceptions or catch
18
to handle any Locust-related error.
19
"""
20
21
class ResponseError(Exception):
22
"""
23
Exception for HTTP response-related errors.
24
25
Raised when response validation fails or response
26
contains unexpected data or status codes.
27
"""
28
29
class CatchResponseError(Exception):
30
"""
31
Exception for response validation errors in catch_response context.
32
33
Used within response context managers to indicate
34
validation failures or response processing errors.
35
"""
36
```
37
38
### Control Flow Exceptions
39
40
Exceptions for controlling task and user execution flow during load tests.
41
42
```python { .api }
43
from locust.exception import InterruptTaskSet, StopUser, RescheduleTask, RescheduleTaskImmediately
44
45
class InterruptTaskSet(Exception):
46
"""
47
Exception to interrupt current TaskSet and return to parent.
48
49
Raises this exception to immediately exit current TaskSet
50
and return control to parent TaskSet or User.
51
52
Args:
53
reschedule (bool): Whether to reschedule interrupted task
54
"""
55
56
def __init__(self, reschedule=True):
57
self.reschedule = reschedule
58
59
class StopUser(Exception):
60
"""
61
Exception to stop user execution completely.
62
63
Raises this exception to immediately stop the current user
64
and remove it from the test execution pool.
65
"""
66
67
class RescheduleTask(Exception):
68
"""
69
Exception to reschedule current task for later execution.
70
71
Raises this exception to skip current task execution
72
and reschedule it to run again later.
73
"""
74
75
class RescheduleTaskImmediately(Exception):
76
"""
77
Exception to reschedule current task for immediate re-execution.
78
79
Raises this exception to immediately restart the current
80
task without waiting for normal task scheduling.
81
"""
82
```
83
84
### Configuration Exceptions
85
86
Exceptions for missing or invalid configuration in user and TaskSet classes.
87
88
```python { .api }
89
from locust.exception import MissingWaitTimeError
90
91
class MissingWaitTimeError(Exception):
92
"""
93
Exception raised when wait_time is not configured on User or TaskSet.
94
95
Raised when a User or TaskSet class doesn't specify a wait_time
96
function and attempts to execute tasks.
97
"""
98
```
99
100
### RPC Communication Exceptions
101
102
Exceptions for distributed testing communication errors between master and workers.
103
104
```python { .api }
105
from locust.exception import RPCError, RPCSendError, RPCReceiveError
106
107
class RPCError(Exception):
108
"""
109
Base exception for RPC communication errors in distributed testing.
110
111
Base class for all RPC-related errors between master
112
and worker nodes in distributed test scenarios.
113
"""
114
115
class RPCSendError(RPCError):
116
"""
117
Exception for RPC message sending errors.
118
119
Raised when master cannot send messages to workers
120
or workers cannot send messages to master.
121
"""
122
123
class RPCReceiveError(RPCError):
124
"""
125
Exception for RPC message receiving errors.
126
127
Raised when master or worker fails to receive
128
or process incoming RPC messages.
129
"""
130
```
131
132
### Runner Exceptions
133
134
Exceptions for test runner configuration and management errors.
135
136
```python { .api }
137
from locust.exception import RunnerAlreadyExistsError
138
139
class RunnerAlreadyExistsError(Exception):
140
"""
141
Exception raised when attempting to create duplicate runner.
142
143
Raised when trying to create a new runner instance
144
when one already exists in the environment.
145
"""
146
```
147
148
## Usage Examples
149
150
### Task Flow Control
151
152
```python
153
from locust import HttpUser, TaskSet, task, between
154
from locust.exception import InterruptTaskSet, StopUser, RescheduleTask
155
import random
156
157
class ShoppingFlow(TaskSet):
158
def on_start(self):
159
# Login required for shopping flow
160
response = self.client.post("/login", json={
161
"username": "testuser",
162
"password": "secret"
163
})
164
165
if response.status_code != 200:
166
# Login failed, interrupt this TaskSet
167
print("Login failed, interrupting shopping flow")
168
raise InterruptTaskSet()
169
170
self.auth_token = response.json().get("token")
171
if not self.auth_token:
172
raise InterruptTaskSet()
173
174
@task(3)
175
def browse_products(self):
176
response = self.client.get("/products")
177
178
if response.status_code == 503:
179
# Service unavailable, reschedule for later
180
print("Service busy, rescheduling task")
181
raise RescheduleTask()
182
183
if response.status_code == 401:
184
# Authentication expired, stop user
185
print("Authentication expired, stopping user")
186
raise StopUser()
187
188
@task(2)
189
def add_to_cart(self):
190
product_id = random.randint(1, 100)
191
response = self.client.post(f"/cart/add/{product_id}")
192
193
if response.status_code == 429:
194
# Rate limited, try again immediately
195
print("Rate limited, retrying immediately")
196
raise RescheduleTaskImmediately()
197
198
@task(1)
199
def checkout(self):
200
response = self.client.post("/checkout")
201
202
if response.status_code == 200:
203
# Successful checkout, end shopping flow
204
print("Checkout successful, ending shopping flow")
205
raise InterruptTaskSet(reschedule=False)
206
207
class WebUser(HttpUser):
208
wait_time = between(1, 3)
209
tasks = [ShoppingFlow]
210
211
@task
212
def browse_home(self):
213
"""Fallback task when not in TaskSet"""
214
self.client.get("/")
215
```
216
217
### Response Validation with Exceptions
218
219
```python
220
from locust import HttpUser, task, between
221
from locust.exception import ResponseError, CatchResponseError
222
import json
223
224
class APIUser(HttpUser):
225
wait_time = between(0.5, 2)
226
227
@task
228
def api_with_validation(self):
229
"""API call with response validation using exceptions"""
230
231
with self.client.get("/api/data", catch_response=True) as response:
232
try:
233
# Validate response status
234
if response.status_code != 200:
235
raise CatchResponseError(f"Unexpected status: {response.status_code}")
236
237
# Validate response content
238
data = response.json()
239
if "status" not in data:
240
raise CatchResponseError("Missing status field in response")
241
242
if data["status"] != "success":
243
raise CatchResponseError(f"API returned error: {data.get('message', 'Unknown error')}")
244
245
# Validate required fields
246
required_fields = ["id", "name", "timestamp"]
247
for field in required_fields:
248
if field not in data:
249
raise CatchResponseError(f"Missing required field: {field}")
250
251
# Validate data types
252
if not isinstance(data["id"], int):
253
raise CatchResponseError("ID field must be integer")
254
255
# Success - response is valid
256
response.success()
257
258
except (json.JSONDecodeError, CatchResponseError) as e:
259
# Mark response as failed
260
response.failure(f"Validation failed: {str(e)}")
261
262
@task
263
def api_with_custom_validation(self):
264
"""API call with custom validation logic"""
265
266
response = self.client.get("/api/users")
267
268
try:
269
# Custom validation logic
270
self.validate_user_response(response)
271
except ResponseError as e:
272
# Log detailed error information
273
print(f"Response validation failed: {e}")
274
print(f"Response code: {response.status_code}")
275
print(f"Response body: {response.text[:200]}...")
276
277
def validate_user_response(self, response):
278
"""Custom response validation with exceptions"""
279
280
if response.status_code >= 500:
281
raise ResponseError(f"Server error: {response.status_code}")
282
283
if response.status_code == 404:
284
raise ResponseError("Users endpoint not found")
285
286
if not response.headers.get("Content-Type", "").startswith("application/json"):
287
raise ResponseError("Response is not JSON")
288
289
try:
290
users = response.json()
291
except json.JSONDecodeError:
292
raise ResponseError("Invalid JSON in response")
293
294
if not isinstance(users, list):
295
raise ResponseError("Expected user list, got different type")
296
297
if len(users) == 0:
298
raise ResponseError("No users returned")
299
300
# Validate user structure
301
for user in users[:5]: # Check first 5 users
302
if not all(field in user for field in ["id", "name", "email"]):
303
raise ResponseError("User missing required fields")
304
```
305
306
### Error Recovery and Retry Logic
307
308
```python
309
from locust import HttpUser, task, between
310
from locust.exception import RescheduleTask, RescheduleTaskImmediately
311
import time
312
import random
313
314
class ResilientUser(HttpUser):
315
wait_time = between(1, 3)
316
317
def __init__(self, *args, **kwargs):
318
super().__init__(*args, **kwargs)
319
self.retry_count = {}
320
self.error_count = 0
321
322
@task
323
def resilient_api_call(self):
324
"""API call with retry logic using exceptions"""
325
326
task_name = "api_call"
327
max_retries = 3
328
current_retries = self.retry_count.get(task_name, 0)
329
330
try:
331
response = self.client.get("/api/unreliable")
332
333
if response.status_code == 429: # Rate limited
334
if current_retries < max_retries:
335
self.retry_count[task_name] = current_retries + 1
336
print(f"Rate limited, retry {current_retries + 1}/{max_retries}")
337
time.sleep(random.uniform(1, 3)) # Backoff
338
raise RescheduleTaskImmediately()
339
else:
340
print(f"Max retries reached for {task_name}")
341
self.retry_count[task_name] = 0
342
343
elif response.status_code >= 500: # Server error
344
if current_retries < max_retries:
345
self.retry_count[task_name] = current_retries + 1
346
print(f"Server error, scheduling retry {current_retries + 1}/{max_retries}")
347
raise RescheduleTask() # Retry later
348
else:
349
print(f"Server consistently failing, giving up on {task_name}")
350
self.retry_count[task_name] = 0
351
352
elif response.status_code == 200:
353
# Success - reset retry count
354
self.retry_count[task_name] = 0
355
self.error_count = max(0, self.error_count - 1) # Reduce error count
356
357
else:
358
# Other errors
359
self.error_count += 1
360
if self.error_count > 10:
361
print("Too many errors, stopping user")
362
raise StopUser()
363
364
except Exception as e:
365
# Unexpected error
366
print(f"Unexpected error in {task_name}: {e}")
367
self.error_count += 1
368
369
if self.error_count > 5:
370
raise StopUser()
371
```
372
373
### Configuration Validation
374
375
```python
376
from locust import HttpUser, TaskSet, task
377
from locust.exception import MissingWaitTimeError
378
379
class ConfiguredTaskSet(TaskSet):
380
"""TaskSet with configuration validation"""
381
382
def __init__(self, parent):
383
super().__init__(parent)
384
385
# Validate configuration on initialization
386
if not hasattr(self, 'wait_time') and not hasattr(self.user, 'wait_time'):
387
raise MissingWaitTimeError(
388
f"{self.__class__.__name__} must define wait_time or User must have wait_time"
389
)
390
391
if not self.tasks:
392
raise LocustError(f"{self.__class__.__name__} has no tasks defined")
393
394
@task
395
def configured_task(self):
396
self.client.get("/configured")
397
398
class ValidatedUser(HttpUser):
399
# Missing wait_time will cause MissingWaitTimeError
400
tasks = [ConfiguredTaskSet]
401
402
def __init__(self, *args, **kwargs):
403
# Add configuration validation
404
if not hasattr(self, 'host') or not self.host:
405
raise LocustError("Host must be configured for ValidatedUser")
406
407
super().__init__(*args, **kwargs)
408
```
409
410
### Custom Exception Handling
411
412
```python
413
from locust import HttpUser, task, between
414
from locust.exception import LocustError
415
416
class CustomTestError(LocustError):
417
"""Custom exception for specific test scenarios"""
418
419
def __init__(self, message, error_code=None, retry_after=None):
420
super().__init__(message)
421
self.error_code = error_code
422
self.retry_after = retry_after
423
424
class BusinessLogicError(LocustError):
425
"""Exception for business logic validation failures"""
426
pass
427
428
class CustomExceptionUser(HttpUser):
429
wait_time = between(1, 2)
430
431
@task
432
def business_operation(self):
433
"""Operation with custom exception handling"""
434
435
try:
436
response = self.client.post("/api/business-logic", json={
437
"operation": "process_order",
438
"order_id": random.randint(1000, 9999)
439
})
440
441
# Custom validation
442
if response.status_code == 200:
443
data = response.json()
444
445
# Business logic validation
446
if data.get("result") == "invalid_order":
447
raise BusinessLogicError("Order validation failed")
448
449
elif data.get("result") == "rate_limited":
450
retry_after = data.get("retry_after", 5)
451
raise CustomTestError(
452
"Rate limited by business logic",
453
error_code="RATE_LIMITED",
454
retry_after=retry_after
455
)
456
457
elif data.get("result") != "success":
458
raise CustomTestError(f"Unexpected result: {data.get('result')}")
459
460
except BusinessLogicError:
461
# Handle business logic errors
462
print("Business logic validation failed, continuing test")
463
464
except CustomTestError as e:
465
# Handle custom test errors
466
print(f"Custom error: {e}")
467
if e.retry_after:
468
print(f"Waiting {e.retry_after} seconds before retry")
469
time.sleep(e.retry_after)
470
raise RescheduleTaskImmediately()
471
472
except LocustError:
473
# Handle any other Locust errors
474
print("General Locust error occurred")
475
raise # Re-raise for proper handling
476
```
477
478
## Types
479
480
```python { .api }
481
from typing import Optional, Any
482
483
# Base exception types
484
class LocustError(Exception):
485
"""Base Locust exception."""
486
487
class ResponseError(Exception):
488
"""HTTP response error."""
489
490
class CatchResponseError(Exception):
491
"""Response validation context error."""
492
493
# Control flow exception types
494
class InterruptTaskSet(Exception):
495
def __init__(self, reschedule: bool = True): ...
496
497
class StopUser(Exception):
498
"""Stop user execution."""
499
500
class RescheduleTask(Exception):
501
"""Reschedule task for later."""
502
503
class RescheduleTaskImmediately(Exception):
504
"""Reschedule task immediately."""
505
506
# Configuration exceptions
507
class MissingWaitTimeError(Exception):
508
"""Missing wait_time configuration."""
509
510
# RPC exceptions
511
class RPCError(Exception):
512
"""Base RPC error."""
513
514
class RPCSendError(RPCError):
515
"""RPC send error."""
516
517
class RPCReceiveError(RPCError):
518
"""RPC receive error."""
519
520
# Runner exceptions
521
class RunnerAlreadyExistsError(Exception):
522
"""Runner already exists."""
523
```