CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-locust

Developer-friendly load testing framework for HTTP and other protocols with distributed testing capabilities.

Pending
Overview
Eval results
Files

events.mddocs/

Events System

Locust's event system provides extensibility hooks for custom metrics, logging, integrations, and test lifecycle management. The global events instance offers access to all built-in events for extending Locust functionality.

Capabilities

Global Events Instance

The primary interface for accessing Locust's event system, available as a global instance.

from locust import events

# Global events instance providing access to all event hooks
events: Events

Core Request Events

Events fired during HTTP request lifecycle for statistics collection and custom processing.

# Request completion event
events.request.add_listener(handler)

def request_handler(request_type, name, response_time, response_length, exception=None, **kwargs):
    """
    Handle request completion events.
    
    Args:
        request_type (str): HTTP method or custom request type
        name (str): Request name for statistics grouping  
        response_time (float): Response time in milliseconds
        response_length (int): Response body length in bytes
        exception (Exception): Exception if request failed
        **kwargs: Additional request metadata
    """

# User error event  
events.user_error.add_listener(handler)

def user_error_handler(user_instance, exception, tb, **kwargs):
    """
    Handle user execution errors.
    
    Args:
        user_instance: User instance that encountered error
        exception (Exception): Exception that occurred
        tb: Traceback object
        **kwargs: Additional error context
    """

Test Lifecycle Events

Events for hooking into test execution phases and managing test lifecycle.

# Test initialization
events.init.add_listener(handler)

def init_handler(environment, **kwargs):
    """
    Handle test initialization.
    
    Args:
        environment: Locust environment instance
        **kwargs: Additional initialization context
    """

# Test start
events.test_start.add_listener(handler)  

def test_start_handler(environment, **kwargs):
    """
    Handle test start event.
    
    Args:
        environment: Locust environment instance
        **kwargs: Additional test start context
    """

# Test stopping (before cleanup)
events.test_stopping.add_listener(handler)

def test_stopping_handler(environment, **kwargs):
    """
    Handle test stopping event (before cleanup).
    
    Args:
        environment: Locust environment instance  
        **kwargs: Additional context
    """

# Test stop (after cleanup)
events.test_stop.add_listener(handler)

def test_stop_handler(environment, **kwargs):
    """
    Handle test stop event (after cleanup).
    
    Args:
        environment: Locust environment instance
        **kwargs: Additional context
    """

# Test quit (final cleanup)
events.quit.add_listener(handler)
events.quitting.add_listener(handler)

def quit_handler(exit_code, **kwargs):
    """
    Handle test quit events.
    
    Args:
        exit_code (int): Exit code for test run
        **kwargs: Additional quit context
    """

User Lifecycle Events

Events related to user spawning and management during test execution.

# User spawning complete
events.spawning_complete.add_listener(handler)

def spawning_complete_handler(user_count, **kwargs):
    """
    Handle user spawning completion.
    
    Args:
        user_count (int): Total number of spawned users
        **kwargs: Additional spawning context
    """

Statistics Events

Events for custom statistics handling and processing.

# Statistics reset
events.reset_stats.add_listener(handler) 

def reset_stats_handler(**kwargs):
    """
    Handle statistics reset event.
    
    Args:
        **kwargs: Reset context
    """

Distributed Testing Events

Events for master/worker coordination in distributed testing scenarios.

# Worker reporting to master
events.report_to_master.add_listener(handler)

def report_to_master_handler(client_id, data, **kwargs):
    """
    Handle worker report to master.
    
    Args:
        client_id (str): Worker client identifier
        data (dict): Report data from worker
        **kwargs: Additional report context
    """

# Master receiving worker report
events.worker_report.add_listener(handler)

def worker_report_handler(client_id, data, **kwargs):
    """
    Handle worker report on master.
    
    Args:
        client_id (str): Worker client identifier  
        data (dict): Report data from worker
        **kwargs: Additional report context
    """

# Worker connect to master
events.worker_connect.add_listener(handler)

def worker_connect_handler(client_id, **kwargs):
    """
    Handle worker connection to master.
    
    Args:
        client_id (str): Worker client identifier
        **kwargs: Connection context
    """

System Monitoring Events

Events for system resource monitoring and alerts.

# CPU usage warning
events.cpu_warning.add_listener(handler)

def cpu_warning_handler(usage_percent, **kwargs):
    """
    Handle CPU usage warnings.
    
    Args:
        usage_percent (float): CPU usage percentage
        **kwargs: Additional system context
    """

# Heartbeat events (distributed testing)
events.heartbeat_sent.add_listener(handler)
events.heartbeat_received.add_listener(handler)

def heartbeat_handler(client_id, **kwargs):
    """
    Handle heartbeat events.
    
    Args:
        client_id (str): Client identifier
        **kwargs: Heartbeat context
    """

# Usage monitoring
events.usage_monitor.add_listener(handler)

def usage_monitor_handler(stats, **kwargs):
    """
    Handle usage monitoring events.
    
    Args:
        stats (dict): Usage statistics
        **kwargs: Monitoring context
    """

Command Line Events

Events for customizing command line argument parsing.

# Command line parser initialization
events.init_command_line_parser.add_listener(handler)

def init_parser_handler(parser, **kwargs):
    """
    Handle command line parser initialization.
    
    Args:
        parser: ArgumentParser instance
        **kwargs: Parser initialization context
    """

Event Classes

Events Class

Main events manager class for creating custom event systems.

class Events:
    """
    Event system manager providing event hooks.
    
    Attributes:
        request: Request completion event hook
        user_error: User error event hook  
        report_to_master: Worker to master reporting hook
        worker_report: Master receiving worker report hook
        worker_connect: Worker connection hook
        spawning_complete: User spawning completion hook
        quitting: Test quitting hook
        quit: Test quit hook  
        init: Test initialization hook
        init_command_line_parser: CLI parser init hook
        test_start: Test start hook
        test_stopping: Test stopping hook
        test_stop: Test stop hook
        reset_stats: Statistics reset hook
        cpu_warning: CPU usage warning hook
        heartbeat_sent: Heartbeat sent hook
        heartbeat_received: Heartbeat received hook
        usage_monitor: Usage monitoring hook
    """

EventHook Class

Individual event hook implementation for subscribing and firing events.

class EventHook:
    """
    Individual event hook for managing event listeners.
    """
    
    def add_listener(self, func):
        """
        Add event listener function.
        
        Args:
            func (callable): Event handler function
        """
    
    def remove_listener(self, func):
        """
        Remove event listener function.
        
        Args:
            func (callable): Event handler function to remove
        """
    
    def fire(self, **kwargs):
        """
        Fire event to all registered listeners.
        
        Args:
            **kwargs: Event data to pass to listeners
        """

Usage Examples

Custom Metrics Collection

from locust import HttpUser, task, between, events
import time
import json

# Custom metrics storage
custom_metrics = {
    "api_calls": 0,
    "error_count": 0,
    "response_times": []
}

def on_request(request_type, name, response_time, response_length, exception, **kwargs):
    """Collect custom metrics on each request"""
    custom_metrics["api_calls"] += 1
    custom_metrics["response_times"].append(response_time)
    
    if exception:
        custom_metrics["error_count"] += 1
    
    # Log slow requests
    if response_time > 5000:  # 5 seconds
        print(f"SLOW REQUEST: {name} took {response_time}ms")

def on_test_stop(environment, **kwargs):
    """Report custom metrics at test end"""
    total_calls = custom_metrics["api_calls"]
    avg_response_time = sum(custom_metrics["response_times"]) / len(custom_metrics["response_times"])
    error_rate = custom_metrics["error_count"] / total_calls * 100
    
    print(f"\n--- Custom Metrics ---")
    print(f"Total API calls: {total_calls}")
    print(f"Average response time: {avg_response_time:.2f}ms")
    print(f"Error rate: {error_rate:.2f}%")
    
    # Save to file
    with open("custom_metrics.json", "w") as f:
        json.dump(custom_metrics, f, indent=2)

# Register event listeners
events.request.add_listener(on_request)
events.test_stop.add_listener(on_test_stop)

class APIUser(HttpUser):
    wait_time = between(1, 3)
    
    @task
    def api_call(self):
        self.client.get("/api/data")

External System Integration

from locust import HttpUser, task, events
import requests
import json

class ExternalReporter:
    def __init__(self, webhook_url):
        self.webhook_url = webhook_url
        self.test_start_time = None
        self.request_count = 0
        
    def on_test_start(self, environment, **kwargs):
        """Notify external system of test start"""
        self.test_start_time = time.time()
        payload = {
            "event": "test_start",
            "timestamp": self.test_start_time,
            "users": environment.runner.target_user_count if environment.runner else 0
        }
        self.send_webhook(payload)
    
    def on_request(self, request_type, name, response_time, response_length, exception, **kwargs):
        """Track request metrics"""
        self.request_count += 1
        
        # Send alerts for errors
        if exception:
            payload = {
                "event": "error",
                "timestamp": time.time(),
                "request_type": request_type,
                "name": name,
                "error": str(exception)
            }
            self.send_webhook(payload)
    
    def on_test_stop(self, environment, **kwargs):
        """Send final test report"""
        duration = time.time() - self.test_start_time
        payload = {
            "event": "test_complete", 
            "timestamp": time.time(),
            "duration": duration,
            "total_requests": self.request_count,
            "stats": environment.stats.serialize_stats() if environment.stats else {}
        }
        self.send_webhook(payload)
    
    def send_webhook(self, payload):
        """Send webhook notification"""
        try:
            requests.post(self.webhook_url, json=payload, timeout=5)
        except Exception as e:
            print(f"Webhook failed: {e}")

# Initialize external reporter
reporter = ExternalReporter("https://monitoring.example.com/webhook")

# Register event listeners
events.test_start.add_listener(reporter.on_test_start)
events.request.add_listener(reporter.on_request)
events.test_stop.add_listener(reporter.on_test_stop)

class WebUser(HttpUser):
    wait_time = between(1, 2)
    
    @task
    def browse(self):
        self.client.get("/")

Custom Statistics and Reporting

from locust import HttpUser, task, events
import csv
import time
from collections import defaultdict

class DetailedStats:
    def __init__(self):
        self.start_time = time.time()
        self.requests_by_minute = defaultdict(list)
        self.errors_by_type = defaultdict(int)
        self.response_time_buckets = defaultdict(int)
        
    def on_request(self, request_type, name, response_time, response_length, exception, **kwargs):
        """Collect detailed request statistics"""
        minute = int((time.time() - self.start_time) // 60)
        
        # Track requests per minute
        self.requests_by_minute[minute].append({
            "type": request_type,
            "name": name,
            "response_time": response_time,
            "length": response_length,
            "success": exception is None
        })
        
        # Track errors by type
        if exception:
            error_type = type(exception).__name__
            self.errors_by_type[error_type] += 1
        
        # Response time buckets
        if response_time < 100:
            self.response_time_buckets["<100ms"] += 1
        elif response_time < 500:
            self.response_time_buckets["100-500ms"] += 1
        elif response_time < 1000:
            self.response_time_buckets["500ms-1s"] += 1
        elif response_time < 5000:
            self.response_time_buckets["1-5s"] += 1
        else:
            self.response_time_buckets[">5s"] += 1
    
    def on_test_stop(self, environment, **kwargs):
        """Generate detailed CSV report"""
        with open("detailed_stats.csv", "w", newline="") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["Minute", "Request_Type", "Name", "Response_Time", "Length", "Success"])
            
            for minute, requests in self.requests_by_minute.items():
                for req in requests:
                    writer.writerow([
                        minute, req["type"], req["name"], 
                        req["response_time"], req["length"], req["success"]
                    ])
        
        # Print summary
        print("\n--- Response Time Distribution ---")
        for bucket, count in self.response_time_buckets.items():
            print(f"{bucket}: {count} requests")
        
        print("\n--- Error Summary ---")
        for error_type, count in self.errors_by_type.items():
            print(f"{error_type}: {count} occurrences")

# Initialize detailed stats collector
stats_collector = DetailedStats()
events.request.add_listener(stats_collector.on_request)
events.test_stop.add_listener(stats_collector.on_test_stop)

class AnalyticsUser(HttpUser):
    wait_time = between(0.5, 2)
    
    @task(3)
    def page_view(self):
        self.client.get("/page")
    
    @task(1)
    def api_call(self):
        self.client.get("/api/data")

Environment-Specific Event Handling

from locust import HttpUser, task, events
import os

def setup_environment_specific_handlers():
    """Setup different event handlers based on environment"""
    environment = os.getenv("TEST_ENV", "development")
    
    if environment == "production":
        # Production: minimal logging, external monitoring
        def prod_error_handler(user_instance, exception, tb, **kwargs):
            # Send to external monitoring service
            send_to_monitoring_service(str(exception))
        
        events.user_error.add_listener(prod_error_handler)
    
    elif environment == "staging":
        # Staging: detailed logging for debugging
        def staging_request_handler(request_type, name, response_time, response_length, exception, **kwargs):
            if exception or response_time > 2000:
                print(f"STAGING ALERT: {name} - {response_time}ms - {exception}")
        
        events.request.add_listener(staging_request_handler)
    
    else:
        # Development: verbose logging
        def dev_request_handler(**kwargs):
            print(f"DEV REQUEST: {kwargs}")
        
        events.request.add_listener(dev_request_handler)

# Setup environment-specific handling
setup_environment_specific_handlers()

class EnvironmentUser(HttpUser):
    wait_time = between(1, 3)
    
    @task
    def environment_test(self):
        self.client.get("/api/environment")

Types

from typing import Callable, Any, Dict, Optional
from locust.env import Environment

# Event handler function types
RequestHandler = Callable[[str, str, float, int, Optional[Exception]], None]
UserErrorHandler = Callable[[Any, Exception, Any], None]  
LifecycleHandler = Callable[[Environment], None]
StatisticsHandler = Callable[[Dict[str, Any]], None]
SystemHandler = Callable[[float], None]  # For CPU warnings, etc.

# Generic event handler
EventHandler = Callable[..., None]

# Event hook interface
class EventHook:
    def add_listener(self, func: EventHandler) -> None: ...
    def remove_listener(self, func: EventHandler) -> None: ...
    def fire(self, **kwargs: Any) -> None: ...

Install with Tessl CLI

npx tessl i tessl/pypi-locust

docs

contrib.md

debugging.md

events.md

exceptions.md

index.md

load-shapes.md

tasksets.md

user-classes.md

wait-time.md

tile.json