CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-locust

Developer-friendly load testing framework for HTTP and other protocols with distributed testing capabilities.

Pending
Overview
Eval results
Files

exceptions.mddocs/

Exception Handling

Locust's exception system provides control flow mechanisms for test execution, response validation, and error handling across users, tasks, and distributed testing scenarios.

Capabilities

Core Exceptions

Base exception classes for general Locust errors and response handling.

from locust.exception import LocustError, ResponseError, CatchResponseError

class LocustError(Exception):
    """
    Base exception class for all Locust-specific errors.
    
    Use as base class for custom Locust exceptions or catch
    to handle any Locust-related error.
    """

class ResponseError(Exception):
    """
    Exception for HTTP response-related errors.
    
    Raised when response validation fails or response
    contains unexpected data or status codes.
    """

class CatchResponseError(Exception):
    """
    Exception for response validation errors in catch_response context.
    
    Used within response context managers to indicate
    validation failures or response processing errors.
    """

Control Flow Exceptions

Exceptions for controlling task and user execution flow during load tests.

from locust.exception import InterruptTaskSet, StopUser, RescheduleTask, RescheduleTaskImmediately

class InterruptTaskSet(Exception):
    """
    Exception to interrupt current TaskSet and return to parent.
    
    Raises this exception to immediately exit current TaskSet
    and return control to parent TaskSet or User.
    
    Args:
        reschedule (bool): Whether to reschedule interrupted task
    """
    
    def __init__(self, reschedule=True):
        self.reschedule = reschedule

class StopUser(Exception):
    """  
    Exception to stop user execution completely.
    
    Raises this exception to immediately stop the current user
    and remove it from the test execution pool.
    """

class RescheduleTask(Exception):
    """
    Exception to reschedule current task for later execution.
    
    Raises this exception to skip current task execution
    and reschedule it to run again later.
    """

class RescheduleTaskImmediately(Exception):
    """
    Exception to reschedule current task for immediate re-execution.
    
    Raises this exception to immediately restart the current
    task without waiting for normal task scheduling.
    """

Configuration Exceptions

Exceptions for missing or invalid configuration in user and TaskSet classes.

from locust.exception import MissingWaitTimeError

class MissingWaitTimeError(Exception):
    """
    Exception raised when wait_time is not configured on User or TaskSet.
    
    Raised when a User or TaskSet class doesn't specify a wait_time
    function and attempts to execute tasks.
    """

RPC Communication Exceptions

Exceptions for distributed testing communication errors between master and workers.

from locust.exception import RPCError, RPCSendError, RPCReceiveError

class RPCError(Exception):
    """
    Base exception for RPC communication errors in distributed testing.
    
    Base class for all RPC-related errors between master
    and worker nodes in distributed test scenarios.
    """

class RPCSendError(RPCError):
    """
    Exception for RPC message sending errors.
    
    Raised when master cannot send messages to workers
    or workers cannot send messages to master.
    """

class RPCReceiveError(RPCError):
    """
    Exception for RPC message receiving errors.
    
    Raised when master or worker fails to receive
    or process incoming RPC messages.
    """

Runner Exceptions

Exceptions for test runner configuration and management errors.

from locust.exception import RunnerAlreadyExistsError

class RunnerAlreadyExistsError(Exception):
    """
    Exception raised when attempting to create duplicate runner.
    
    Raised when trying to create a new runner instance
    when one already exists in the environment.
    """

Usage Examples

Task Flow Control

from locust import HttpUser, TaskSet, task, between
from locust.exception import InterruptTaskSet, StopUser, RescheduleTask
import random

class ShoppingFlow(TaskSet):
    def on_start(self):
        # Login required for shopping flow
        response = self.client.post("/login", json={
            "username": "testuser",
            "password": "secret"
        })
        
        if response.status_code != 200:
            # Login failed, interrupt this TaskSet
            print("Login failed, interrupting shopping flow")
            raise InterruptTaskSet()
        
        self.auth_token = response.json().get("token")
        if not self.auth_token:
            raise InterruptTaskSet()
    
    @task(3)
    def browse_products(self):
        response = self.client.get("/products")
        
        if response.status_code == 503:
            # Service unavailable, reschedule for later
            print("Service busy, rescheduling task")
            raise RescheduleTask()
        
        if response.status_code == 401:
            # Authentication expired, stop user
            print("Authentication expired, stopping user")
            raise StopUser()
    
    @task(2)
    def add_to_cart(self):
        product_id = random.randint(1, 100)
        response = self.client.post(f"/cart/add/{product_id}")
        
        if response.status_code == 429:
            # Rate limited, try again immediately
            print("Rate limited, retrying immediately")
            raise RescheduleTaskImmediately()
    
    @task(1)
    def checkout(self):
        response = self.client.post("/checkout")
        
        if response.status_code == 200:
            # Successful checkout, end shopping flow
            print("Checkout successful, ending shopping flow")
            raise InterruptTaskSet(reschedule=False)

class WebUser(HttpUser):
    wait_time = between(1, 3)
    tasks = [ShoppingFlow]
    
    @task
    def browse_home(self):
        """Fallback task when not in TaskSet"""
        self.client.get("/")

Response Validation with Exceptions

from locust import HttpUser, task, between
from locust.exception import ResponseError, CatchResponseError
import json

class APIUser(HttpUser):
    wait_time = between(0.5, 2)
    
    @task
    def api_with_validation(self):
        """API call with response validation using exceptions"""
        
        with self.client.get("/api/data", catch_response=True) as response:
            try:
                # Validate response status
                if response.status_code != 200:
                    raise CatchResponseError(f"Unexpected status: {response.status_code}")
                
                # Validate response content
                data = response.json()
                if "status" not in data:
                    raise CatchResponseError("Missing status field in response")
                
                if data["status"] != "success":
                    raise CatchResponseError(f"API returned error: {data.get('message', 'Unknown error')}")
                
                # Validate required fields
                required_fields = ["id", "name", "timestamp"]
                for field in required_fields:
                    if field not in data:
                        raise CatchResponseError(f"Missing required field: {field}")
                
                # Validate data types
                if not isinstance(data["id"], int):
                    raise CatchResponseError("ID field must be integer")
                
                # Success - response is valid
                response.success()
                
            except (json.JSONDecodeError, CatchResponseError) as e:
                # Mark response as failed
                response.failure(f"Validation failed: {str(e)}")
    
    @task
    def api_with_custom_validation(self):
        """API call with custom validation logic"""
        
        response = self.client.get("/api/users")
        
        try:
            # Custom validation logic
            self.validate_user_response(response)
        except ResponseError as e:
            # Log detailed error information
            print(f"Response validation failed: {e}")
            print(f"Response code: {response.status_code}")
            print(f"Response body: {response.text[:200]}...")
    
    def validate_user_response(self, response):
        """Custom response validation with exceptions"""
        
        if response.status_code >= 500:
            raise ResponseError(f"Server error: {response.status_code}")
        
        if response.status_code == 404:
            raise ResponseError("Users endpoint not found")
        
        if not response.headers.get("Content-Type", "").startswith("application/json"):
            raise ResponseError("Response is not JSON")
        
        try:
            users = response.json()
        except json.JSONDecodeError:
            raise ResponseError("Invalid JSON in response")
        
        if not isinstance(users, list):
            raise ResponseError("Expected user list, got different type")
        
        if len(users) == 0:
            raise ResponseError("No users returned")
        
        # Validate user structure
        for user in users[:5]:  # Check first 5 users
            if not all(field in user for field in ["id", "name", "email"]):
                raise ResponseError("User missing required fields")

Error Recovery and Retry Logic

from locust import HttpUser, task, between
from locust.exception import RescheduleTask, RescheduleTaskImmediately
import time
import random

class ResilientUser(HttpUser):
    wait_time = between(1, 3)
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.retry_count = {}
        self.error_count = 0
    
    @task
    def resilient_api_call(self):
        """API call with retry logic using exceptions"""
        
        task_name = "api_call"
        max_retries = 3
        current_retries = self.retry_count.get(task_name, 0)
        
        try:
            response = self.client.get("/api/unreliable")
            
            if response.status_code == 429:  # Rate limited
                if current_retries < max_retries:
                    self.retry_count[task_name] = current_retries + 1
                    print(f"Rate limited, retry {current_retries + 1}/{max_retries}")
                    time.sleep(random.uniform(1, 3))  # Backoff
                    raise RescheduleTaskImmediately()
                else:
                    print(f"Max retries reached for {task_name}")
                    self.retry_count[task_name] = 0
            
            elif response.status_code >= 500:  # Server error
                if current_retries < max_retries:
                    self.retry_count[task_name] = current_retries + 1
                    print(f"Server error, scheduling retry {current_retries + 1}/{max_retries}")
                    raise RescheduleTask()  # Retry later
                else:
                    print(f"Server consistently failing, giving up on {task_name}")
                    self.retry_count[task_name] = 0
            
            elif response.status_code == 200:
                # Success - reset retry count
                self.retry_count[task_name] = 0
                self.error_count = max(0, self.error_count - 1)  # Reduce error count
            
            else:
                # Other errors
                self.error_count += 1
                if self.error_count > 10:
                    print("Too many errors, stopping user")
                    raise StopUser()
        
        except Exception as e:
            # Unexpected error
            print(f"Unexpected error in {task_name}: {e}")
            self.error_count += 1
            
            if self.error_count > 5:
                raise StopUser()

Configuration Validation

from locust import HttpUser, TaskSet, task
from locust.exception import MissingWaitTimeError

class ConfiguredTaskSet(TaskSet):
    """TaskSet with configuration validation"""
    
    def __init__(self, parent):
        super().__init__(parent)
        
        # Validate configuration on initialization
        if not hasattr(self, 'wait_time') and not hasattr(self.user, 'wait_time'):
            raise MissingWaitTimeError(
                f"{self.__class__.__name__} must define wait_time or User must have wait_time"
            )
        
        if not self.tasks:
            raise LocustError(f"{self.__class__.__name__} has no tasks defined")
    
    @task
    def configured_task(self):
        self.client.get("/configured")

class ValidatedUser(HttpUser):
    # Missing wait_time will cause MissingWaitTimeError
    tasks = [ConfiguredTaskSet]
    
    def __init__(self, *args, **kwargs):
        # Add configuration validation
        if not hasattr(self, 'host') or not self.host:
            raise LocustError("Host must be configured for ValidatedUser")
        
        super().__init__(*args, **kwargs)

Custom Exception Handling

from locust import HttpUser, task, between
from locust.exception import LocustError

class CustomTestError(LocustError):
    """Custom exception for specific test scenarios"""
    
    def __init__(self, message, error_code=None, retry_after=None):
        super().__init__(message)
        self.error_code = error_code
        self.retry_after = retry_after

class BusinessLogicError(LocustError):
    """Exception for business logic validation failures"""
    pass

class CustomExceptionUser(HttpUser):
    wait_time = between(1, 2)
    
    @task
    def business_operation(self):
        """Operation with custom exception handling"""
        
        try:
            response = self.client.post("/api/business-logic", json={
                "operation": "process_order",
                "order_id": random.randint(1000, 9999)
            })
            
            # Custom validation
            if response.status_code == 200:
                data = response.json()
                
                # Business logic validation
                if data.get("result") == "invalid_order":
                    raise BusinessLogicError("Order validation failed")
                
                elif data.get("result") == "rate_limited":
                    retry_after = data.get("retry_after", 5)
                    raise CustomTestError(
                        "Rate limited by business logic",
                        error_code="RATE_LIMITED",
                        retry_after=retry_after
                    )
                
                elif data.get("result") != "success":
                    raise CustomTestError(f"Unexpected result: {data.get('result')}")
            
        except BusinessLogicError:
            # Handle business logic errors
            print("Business logic validation failed, continuing test")
            
        except CustomTestError as e:
            # Handle custom test errors
            print(f"Custom error: {e}")
            if e.retry_after:
                print(f"Waiting {e.retry_after} seconds before retry")
                time.sleep(e.retry_after)
                raise RescheduleTaskImmediately()
        
        except LocustError:
            # Handle any other Locust errors
            print("General Locust error occurred")
            raise  # Re-raise for proper handling

Types

from typing import Optional, Any

# Base exception types
class LocustError(Exception):
    """Base Locust exception."""

class ResponseError(Exception):
    """HTTP response error."""

class CatchResponseError(Exception):
    """Response validation context error."""

# Control flow exception types
class InterruptTaskSet(Exception):
    def __init__(self, reschedule: bool = True): ...

class StopUser(Exception):
    """Stop user execution."""

class RescheduleTask(Exception):
    """Reschedule task for later."""

class RescheduleTaskImmediately(Exception):
    """Reschedule task immediately."""

# Configuration exceptions
class MissingWaitTimeError(Exception):
    """Missing wait_time configuration."""

# RPC exceptions
class RPCError(Exception):
    """Base RPC error."""

class RPCSendError(RPCError):
    """RPC send error."""

class RPCReceiveError(RPCError):
    """RPC receive error."""

# Runner exceptions  
class RunnerAlreadyExistsError(Exception):
    """Runner already exists."""

Install with Tessl CLI

npx tessl i tessl/pypi-locust

docs

contrib.md

debugging.md

events.md

exceptions.md

index.md

load-shapes.md

tasksets.md

user-classes.md

wait-time.md

tile.json